add alignment flag to ts lib

This commit is contained in:
Ramkoemar 2017-07-25 14:45:20 +02:00 committed by Thulinma
parent f3cc7b0a4e
commit 7ac6388dbb
2 changed files with 119 additions and 37 deletions

View file

@ -5,6 +5,7 @@
#include "mp4_generic.h" #include "mp4_generic.h"
#include "nal.h" #include "nal.h"
#include <sys/stat.h> #include <sys/stat.h>
#include <stdint.h>
namespace TS{ namespace TS{
@ -222,6 +223,8 @@ namespace TS{
return; return;
} }
if (threaded){globalSem.wait();} if (threaded){globalSem.wait();}
bool parsePes = false; bool parsePes = false;
@ -229,18 +232,21 @@ namespace TS{
int packNum = 1; int packNum = 1;
// Usually we append a packet at a time, so the start code is expected to show up at the end. // Usually we append a packet at a time, so the start code is expected to show up at the end.
std::deque<Packet> &inStream = pesStreams[tid]; std::deque<Packet> &inStream = pesStreams[tid];
if (inStream.rbegin()->getUnitStart()){
parsePes = true; if(inStream.size() > 1) {
}else{ if (inStream.rbegin()->getUnitStart()){
// But, sometimes (e.g. live) we do multiples, and need to check all of it... parsePes = true;
std::deque<Packet>::iterator lastPack = inStream.end(); }else{
std::deque<Packet>::iterator curPack = inStream.begin(); // But, sometimes (e.g. live) we do multiples, and need to check all of it...
curPack++; std::deque<Packet>::iterator lastPack = inStream.end();
while (curPack != lastPack && !curPack->getUnitStart()){ std::deque<Packet>::iterator curPack = inStream.begin();
curPack++; curPack++;
packNum++; while (curPack != lastPack && !curPack->getUnitStart()){
curPack++;
packNum++;
}
if (curPack != lastPack){parsePes = true;}
} }
if (curPack != lastPack){parsePes = true;}
} }
if (threaded){globalSem.post();} if (threaded){globalSem.post();}
@ -249,8 +255,9 @@ namespace TS{
void Stream::parse(Packet &newPack, unsigned long long bytePos){ void Stream::parse(Packet &newPack, unsigned long long bytePos){
add(newPack, bytePos); add(newPack, bytePos);
int tid = newPack.getPID(); if (newPack.getUnitStart()){
parse(tid); parse(newPack.getPID());
}
} }
bool Stream::hasPacketOnEachTrack() const{ bool Stream::hasPacketOnEachTrack() const{
@ -287,7 +294,8 @@ namespace TS{
bool Stream::hasPacket(unsigned long tid) const{ bool Stream::hasPacket(unsigned long tid) const{
if (threaded){globalSem.wait();} if (threaded){globalSem.wait();}
if (!pesStreams.count(tid)){ std::map<unsigned long, std::deque<Packet> >::const_iterator pesIt = pesStreams.find(tid);
if (pesIt == pesStreams.end()){
if (threaded){globalSem.post();} if (threaded){globalSem.post();}
return false; return false;
} }
@ -295,13 +303,15 @@ namespace TS{
if (threaded){globalSem.post();} if (threaded){globalSem.post();}
return true; return true;
} }
std::deque<Packet>::const_iterator curPack = pesStreams.at(tid).begin(); const std::deque<Packet> & thisStream = pesIt->second;
std::deque<Packet>::const_iterator curPack = thisStream.begin();
std::deque<Packet>::const_iterator endPack = thisStream.end();
if (curPack != pesStreams.at(tid).end()){curPack++;} if (curPack != endPack){curPack++;}
while (curPack != pesStreams.at(tid).end() && !curPack->getUnitStart()){curPack++;} while (curPack != endPack && !curPack->getUnitStart()){curPack++;}
if (curPack != pesStreams.at(tid).end()){ if (curPack != endPack){
if (threaded){globalSem.post();} if (threaded){globalSem.post();}
return true; return true;
} }
@ -371,10 +381,17 @@ namespace TS{
int packNum = 1; int packNum = 1;
std::deque<Packet>::iterator curPack = inStream.begin(); std::deque<Packet>::iterator curPack = inStream.begin();
curPack++;
while (curPack != inStream.end() && !curPack->getUnitStart()){ if (inStream.rbegin()->getUnitStart()){
packNum = inStream.size() - 1;
curPack = inStream.end();
curPack --;
}else{
curPack++; curPack++;
packNum++; while (curPack != inStream.end() && !curPack->getUnitStart()){
curPack++;
packNum++;
}
} }
if (!finished && curPack == inStream.end()){ if (!finished && curPack == inStream.end()){
if (threaded){globalSem.post();} if (threaded){globalSem.post();}
@ -396,6 +413,11 @@ namespace TS{
// allocate a buffer, do it all again, but this time also copy the data bytes over to char* // allocate a buffer, do it all again, but this time also copy the data bytes over to char*
// payload // payload
char *payload = (char *)malloc(paySize); char *payload = (char *)malloc(paySize);
if(!payload){
FAIL_MSG("cannot allocate PES packet!");
return;
}
paySize = 0; paySize = 0;
curPack = inStream.begin(); curPack = inStream.begin();
int lastCtr = curPack->getContinuityCounter() - 1; int lastCtr = curPack->getContinuityCounter() - 1;
@ -452,9 +474,11 @@ namespace TS{
// We substract PES_header_data_length, plus the 9 bytes of mandatory header bytes // We substract PES_header_data_length, plus the 9 bytes of mandatory header bytes
realPayloadSize -= (9 + pesHeader[8]); realPayloadSize -= (9 + pesHeader[8]);
// Read the metadata for this PES Packet // Read the metadata for this PES Packet
///\todo Determine keyframe-ness ///\todo Determine keyframe-ness
unsigned int timeStamp = 0; uint64_t timeStamp = 0;
int64_t timeOffset = 0; int64_t timeOffset = 0;
unsigned int pesOffset = 9; // mandatory headers unsigned int pesOffset = 9; // mandatory headers
if ((pesHeader[7] >> 6) & 0x02){// Check for PTS presence if ((pesHeader[7] >> 6) & 0x02){// Check for PTS presence
@ -469,6 +493,7 @@ namespace TS{
} }
} }
if (pesHeader[7] & 0x20){// ESCR - ignored if (pesHeader[7] & 0x20){// ESCR - ignored
pesOffset += 6; pesOffset += 6;
} }
@ -491,7 +516,7 @@ namespace TS{
} }
const char *pesPayload = pesHeader + pesOffset; const char *pesPayload = pesHeader + pesOffset;
parseBitstream(tid, pesPayload, realPayloadSize, timeStamp, timeOffset, bPos); parseBitstream(tid, pesPayload, realPayloadSize, timeStamp, timeOffset, bPos, pesHeader[6] & 0x04 );
// Shift the offset by the payload size, the mandatory headers and the optional // Shift the offset by the payload size, the mandatory headers and the optional
// headers/padding // headers/padding
@ -501,10 +526,13 @@ namespace TS{
} }
void Stream::parseBitstream(uint32_t tid, const char *pesPayload, uint32_t realPayloadSize, void Stream::parseBitstream(uint32_t tid, const char *pesPayload, uint32_t realPayloadSize,
uint64_t timeStamp, int64_t timeOffset, uint64_t bPos){ uint64_t timeStamp, int64_t timeOffset, uint64_t bPos, bool alignment){
//INFO_MSG("timestamp: %llu offset: %lld", timeStamp, timeOffset);
// Create a new (empty) DTSC Packet at the end of the buffer // Create a new (empty) DTSC Packet at the end of the buffer
if (pidToCodec[tid] == AAC){ unsigned long thisCodec = pidToCodec[tid];
std::deque<DTSC::Packet> & out = outPackets[tid];
if (thisCodec == AAC){
// Parse all the ADTS packets // Parse all the ADTS packets
unsigned long offsetInPes = 0; unsigned long offsetInPes = 0;
uint64_t msRead = 0; uint64_t msRead = 0;
@ -522,8 +550,8 @@ namespace TS{
MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str()); MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str());
adtsInfo[tid] = adtsPack; adtsInfo[tid] = adtsPack;
} }
outPackets[tid].push_back(DTSC::Packet()); out.push_back(DTSC::Packet());
outPackets[tid].back().genericFill( out.back().genericFill(
timeStamp - ((adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency()), timeStamp - ((adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency()),
timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(), timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(),
remainders[tid].getBpos(), 0); remainders[tid].getBpos(), 0);
@ -538,8 +566,8 @@ namespace TS{
MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str()); MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str());
adtsInfo[tid] = adtsPack; adtsInfo[tid] = adtsPack;
} }
outPackets[tid].push_back(DTSC::Packet()); out.push_back(DTSC::Packet());
outPackets[tid].back().genericFill(timeStamp + msRead, timeOffset, tid, out.back().genericFill(timeStamp + msRead, timeOffset, tid,
adtsPack.getPayload(), adtsPack.getPayloadSize(), bPos, adtsPack.getPayload(), adtsPack.getPayloadSize(), bPos,
0); 0);
msRead += (adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency(); msRead += (adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency();
@ -558,25 +586,71 @@ namespace TS{
} }
if (threaded){globalSem.post();} if (threaded){globalSem.post();}
} }
if (pidToCodec[tid] == ID3 || pidToCodec[tid] == AC3){ if (thisCodec == ID3 || thisCodec == AC3){
if (threaded){globalSem.wait();} if (threaded){globalSem.wait();}
outPackets[tid].push_back(DTSC::Packet()); out.push_back(DTSC::Packet());
outPackets[tid].back().genericFill(timeStamp, timeOffset, tid, pesPayload, realPayloadSize, out.back().genericFill(timeStamp, timeOffset, tid, pesPayload, realPayloadSize,
bPos, 0); bPos, 0);
if (threaded){globalSem.post();} if (threaded){globalSem.post();}
} }
if (pidToCodec[tid] == H264 || pidToCodec[tid] == H265){ if (thisCodec == H264 || thisCodec == H265){
const char *nextPtr; const char *nextPtr;
const char *pesEnd = pesPayload+realPayloadSize; const char *pesEnd = pesPayload+realPayloadSize;
bool isKeyFrame = false; bool isKeyFrame = false;
uint32_t nalSize = 0; uint32_t nalSize = 0;
nextPtr = nalu::scanAnnexB(pesPayload, realPayloadSize); nextPtr = nalu::scanAnnexB(pesPayload, realPayloadSize);
if (!nextPtr){ if (!nextPtr){
WARN_MSG("No H264 start code found in entire PES packet!"); nextPtr = pesEnd;
return; nalSize = realPayloadSize;
if(!alignment && timeStamp && buildPacket.count(tid) && timeStamp != buildPacket[tid].getTime()){
FAIL_MSG("No startcode in packet @ %llu ms, and time is not equal to %llu ms so can't merge", timeStamp, buildPacket[tid].getTime());
return;
}
if (alignment){
// If the timestamp differs from current PES timestamp, send the previous packet out and
// fill a new one.
if (buildPacket[tid].getTime() != timeStamp){
// Add the finished DTSC packet to our output buffer
if (threaded){globalSem.wait();}
out.push_back(buildPacket[tid]);
if (threaded){globalSem.post();}
uint32_t size;
char * tmp ;
buildPacket[tid].getString("data", tmp, size);
INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime())
if (threaded){globalSem.post();}
// Create a new empty packet with the key frame bit set to true
buildPacket[tid].null();
buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true);
buildPacket[tid].setKeyFrame(false);
}
if (!buildPacket.count(tid)){
buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true);
buildPacket[tid].setKeyFrame(false);
}
// Check if this is a keyframe
parseNal(tid, pesPayload, nextPtr, isKeyFrame);
// If yes, set the keyframe flag
if (isKeyFrame){
buildPacket[tid].setKeyFrame(true);
}
// No matter what, now append the current NAL unit to the current packet
buildPacket[tid].appendNal(pesPayload, nalSize);
}else{
buildPacket[tid].upgradeNal(pesPayload, nalSize);
return;
}
} }
while (nextPtr < pesEnd){ while (nextPtr < pesEnd){
@ -603,7 +677,13 @@ namespace TS{
if (buildPacket[tid].getTime() != timeStamp){ if (buildPacket[tid].getTime() != timeStamp){
// Add the finished DTSC packet to our output buffer // Add the finished DTSC packet to our output buffer
if (threaded){globalSem.wait();} if (threaded){globalSem.wait();}
outPackets[tid].push_back(buildPacket[tid]); out.push_back(buildPacket[tid]);
uint32_t size;
char * tmp ;
buildPacket[tid].getString("data", tmp, size);
// INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime())
if (threaded){globalSem.post();} if (threaded){globalSem.post();}
// Create a new empty packet with the key frame bit set to true // Create a new empty packet with the key frame bit set to true
buildPacket[tid].null(); buildPacket[tid].null();
@ -611,12 +691,14 @@ namespace TS{
buildPacket[tid].setKeyFrame(false); buildPacket[tid].setKeyFrame(false);
} }
// No matter what, now append the current NAL unit to the current packet // No matter what, now append the current NAL unit to the current packet
buildPacket[tid].appendNal(pesPayload, nalSize, nalSize); buildPacket[tid].appendNal(pesPayload, nalSize);
} }
if (((nextPtr - pesPayload) + 3) >= realPayloadSize){return;}//end of the line if (((nextPtr - pesPayload) + 3) >= realPayloadSize){return;}//end of the line
realPayloadSize -= ((nextPtr - pesPayload) + 3); // decrease the total size realPayloadSize -= ((nextPtr - pesPayload) + 3); // decrease the total size
pesPayload = nextPtr + 3; pesPayload = nextPtr + 3;
nextPtr = nalu::scanAnnexB(pesPayload, realPayloadSize); nextPtr = nalu::scanAnnexB(pesPayload, realPayloadSize);
} }
} }

View file

@ -56,7 +56,7 @@ namespace TS{
void eraseTrack(unsigned long tid); void eraseTrack(unsigned long tid);
bool isDataTrack(unsigned long tid); bool isDataTrack(unsigned long tid);
void parseBitstream(uint32_t tid, const char *pesPayload, uint32_t realPayloadSize, void parseBitstream(uint32_t tid, const char *pesPayload, uint32_t realPayloadSize,
uint64_t timeStamp, int64_t timeOffset, uint64_t bPos); uint64_t timeStamp, int64_t timeOffset, uint64_t bPos, bool alignment);
std::set<unsigned long> getActiveTracks(); std::set<unsigned long> getActiveTracks();
private: private: