HLS input speedup, TS Stream speedup

This commit is contained in:
Thulinma 2019-02-26 14:48:15 +01:00
parent 9c3263efa2
commit c6172a96ff
3 changed files with 80 additions and 51 deletions

View file

@ -70,6 +70,8 @@ namespace TS{
Stream::Stream(bool _threaded){ Stream::Stream(bool _threaded){
threaded = _threaded; threaded = _threaded;
psCache = 0;
psCacheTid = 0;
} }
Stream::~Stream(){ Stream::~Stream(){
@ -84,6 +86,8 @@ namespace TS{
void Stream::partialClear(){ void Stream::partialClear(){
tthread::lock_guard<tthread::recursive_mutex> guard(tMutex); tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
pesStreams.clear(); pesStreams.clear();
psCacheTid = 0;
psCache = 0;
pesPositions.clear(); pesPositions.clear();
outPackets.clear(); outPackets.clear();
buildPacket.clear(); buildPacket.clear();
@ -130,10 +134,16 @@ namespace TS{
tthread::lock_guard<tthread::recursive_mutex> guard(tMutex); tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
uint32_t tid = newPack.getPID(); uint32_t tid = newPack.getPID();
bool unitStart = newPack.getUnitStart(); bool unitStart = newPack.getUnitStart();
std::deque<Packet> & PS = pesStreams[tid]; static uint32_t wantPrev = 0;
if ((unitStart || PS.size()) && bool wantTrack = ((wantPrev == tid) || (tid == 0 || newPack.isPMT() || pidToCodec.count(tid)));
(tid == 0 || newPack.isPMT() || pidToCodec.count(tid))){ if (!wantTrack){return;}
PS.push_back(newPack); if (psCacheTid != tid || !psCache){
psCache = &(pesStreams[tid]);
psCacheTid = tid;
}
if (unitStart || !psCache->empty()){
wantPrev = tid;
psCache->push_back(newPack);
if (unitStart){ if (unitStart){
pesPositions[tid].push_back(bytePos); pesPositions[tid].push_back(bytePos);
++(seenUnitStart[tid]); ++(seenUnitStart[tid]);
@ -154,13 +164,16 @@ namespace TS{
if (!pesStreams.count(tid) || pesStreams[tid].size() == 0){ if (!pesStreams.count(tid) || pesStreams[tid].size() == 0){
return; return;
} }
std::deque<Packet> &trackPackets = pesStreams[tid]; if (psCacheTid != tid || !psCache){
psCache = &(pesStreams[tid]);
psCacheTid = tid;
}
// Handle PAT packets // Handle PAT packets
if (tid == 0){ if (tid == 0){
///\todo Keep track of updates in PAT instead of keeping only the last PAT as a reference ///\todo Keep track of updates in PAT instead of keeping only the last PAT as a reference
associationTable = trackPackets.back(); associationTable = psCache->back();
associationTable.parsePIDs(); associationTable.parsePIDs();
lastPAT = Util::bootSecs(); lastPAT = Util::bootSecs();
@ -168,6 +181,8 @@ namespace TS{
for (size_t i = 0; i < pmtCount; i++){pmtTracks.insert(associationTable.getProgramPID(i));} for (size_t i = 0; i < pmtCount; i++){pmtTracks.insert(associationTable.getProgramPID(i));}
pesStreams.erase(0); pesStreams.erase(0);
psCacheTid = 0;
psCache = 0;
return; return;
} }
@ -178,7 +193,7 @@ namespace TS{
if (pmtTracks.count(tid)){ if (pmtTracks.count(tid)){
///\todo Keep track of updates in PMT instead of keeping only the last PMT per program as a ///\todo Keep track of updates in PMT instead of keeping only the last PMT per program as a
/// reference /// reference
mappingTable[tid] = trackPackets.back(); mappingTable[tid] = psCache->back();
lastPMT[tid] = Util::bootSecs(); lastPMT[tid] = Util::bootSecs();
ProgramMappingEntry entry = mappingTable[tid].getEntry(0); ProgramMappingEntry entry = mappingTable[tid].getEntry(0);
while (entry){ while (entry){
@ -203,6 +218,8 @@ namespace TS{
} }
pesStreams.erase(tid); pesStreams.erase(tid);
psCacheTid = 0;
psCache = 0;
return; return;
} }
@ -246,8 +263,7 @@ namespace TS{
bool Stream::hasPacket(size_t tid) const { bool Stream::hasPacket(size_t tid) const {
tthread::lock_guard<tthread::recursive_mutex> guard(tMutex); tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
std::map<size_t, std::deque<Packet> >::const_iterator pesIt = pesStreams.find(tid); if (psCacheTid != tid && pesStreams.find(tid) == pesStreams.end()){
if (pesIt == pesStreams.end()){
return false; return false;
} }
if (outPackets.count(tid) && outPackets.at(tid).size()){ if (outPackets.count(tid) && outPackets.at(tid).size()){
@ -300,27 +316,30 @@ namespace TS{
if (!pidToCodec.count(tid)){ if (!pidToCodec.count(tid)){
return; // skip unknown codecs return; // skip unknown codecs
} }
std::deque<Packet> &inStream = pesStreams[tid]; if (psCacheTid != tid || !psCache){
if (inStream.size() <= 1){ psCache = &(pesStreams[tid]);
psCacheTid = tid;
}
if (psCache->size() <= 1){
if (!finished){FAIL_MSG("No PES packets to parse");} if (!finished){FAIL_MSG("No PES packets to parse");}
return; return;
} }
// Find number of packets before unit Start // Find number of packets before unit Start
size_t packNum = 1; size_t packNum = 1;
std::deque<Packet>::iterator curPack = inStream.begin(); std::deque<Packet>::iterator curPack = psCache->begin();
if (seenUnitStart[tid] == 2 && inStream.begin()->getUnitStart() && inStream.rbegin()->getUnitStart()){ if (seenUnitStart[tid] == 2 && psCache->begin()->getUnitStart() && psCache->rbegin()->getUnitStart()){
packNum = inStream.size() - 1; packNum = psCache->size() - 1;
curPack = inStream.end(); curPack = psCache->end();
curPack--; curPack--;
}else{ }else{
curPack++; curPack++;
while (curPack != inStream.end() && !curPack->getUnitStart()){ while (curPack != psCache->end() && !curPack->getUnitStart()){
curPack++; curPack++;
packNum++; packNum++;
} }
} }
if (!finished && curPack == inStream.end()){ if (!finished && curPack == psCache->end()){
FAIL_MSG("No PES packets to parse (%" PRIu32 ")", seenUnitStart[tid]); FAIL_MSG("No PES packets to parse (%" PRIu32 ")", seenUnitStart[tid]);
return; return;
} }
@ -335,7 +354,7 @@ namespace TS{
uint32_t paySize = 0; uint32_t paySize = 0;
// Loop over the packets we need, and calculate the total payload size // Loop over the packets we need, and calculate the total payload size
curPack = inStream.begin(); curPack = psCache->begin();
int lastCtr = curPack->getContinuityCounter() - 1; int lastCtr = curPack->getContinuityCounter() - 1;
for (size_t i = 0; i < packNum; i++){ for (size_t i = 0; i < packNum; i++){
if (curPack->getContinuityCounter() == lastCtr){ if (curPack->getContinuityCounter() == lastCtr){
@ -356,7 +375,7 @@ namespace TS{
} }
paySize = 0; paySize = 0;
curPack = inStream.begin(); curPack = psCache->begin();
lastCtr = curPack->getContinuityCounter() - 1; lastCtr = curPack->getContinuityCounter() - 1;
for (int i = 0; i < packNum; i++){ for (int i = 0; i < packNum; i++){
if (curPack->getContinuityCounter() == lastCtr){ if (curPack->getContinuityCounter() == lastCtr){
@ -372,7 +391,7 @@ namespace TS{
paySize += curPack->getPayloadLength(); paySize += curPack->getPayloadLength();
curPack++; curPack++;
} }
inStream.erase(inStream.begin(), curPack); psCache->erase(psCache->begin(), curPack);
// we now have the whole PES packet in char* payload, with a total size of paySize (including // we now have the whole PES packet in char* payload, with a total size of paySize (including
// headers) // headers)
@ -559,41 +578,37 @@ namespace TS{
FAIL_MSG("No startcode in packet @ %" PRIu64 " ms, and time is not equal to %" PRIu64 " ms so can't merge", timeStamp, buildPacket[tid].getTime()); FAIL_MSG("No startcode in packet @ %" PRIu64 " ms, and time is not equal to %" PRIu64 " ms so can't merge", timeStamp, buildPacket[tid].getTime());
return; return;
} }
DTSC::Packet & bp = buildPacket[tid];
if (alignment){ if (alignment){
// If the timestamp differs from current PES timestamp, send the previous packet out and // If the timestamp differs from current PES timestamp, send the previous packet out and
// fill a new one. // fill a new one.
if (buildPacket[tid].getTime() != timeStamp){ if (bp.getTime() != timeStamp){
// Add the finished DTSC packet to our output buffer // Add the finished DTSC packet to our output buffer
out.push_back(buildPacket[tid]); out.push_back(bp);
size_t size; size_t size;
char * tmp ; char * tmp ;
buildPacket[tid].getString("data", tmp, size); bp.getString("data", tmp, size);
INFO_MSG("buildpacket: size: %zu, timestamp: %" PRIu64, size, buildPacket[tid].getTime()) INFO_MSG("buildpacket: size: %zu, timestamp: %" PRIu64, size, bp.getTime())
// Create a new empty packet with the key frame bit set to true // Create a new empty packet with the key frame bit set to true
buildPacket[tid].null(); bp.null();
buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true); bp.genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true);
buildPacket[tid].setKeyFrame(false); bp.setKeyFrame(false);
} }
if (!buildPacket.count(tid)){
buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true);
buildPacket[tid].setKeyFrame(false);
}
// Check if this is a keyframe // Check if this is a keyframe
parseNal(tid, pesPayload, nextPtr, isKeyFrame); parseNal(tid, pesPayload, nextPtr, isKeyFrame);
// If yes, set the keyframe flag // If yes, set the keyframe flag
if (isKeyFrame){ if (isKeyFrame){
buildPacket[tid].setKeyFrame(true); bp.setKeyFrame(true);
} }
// No matter what, now append the current NAL unit to the current packet // No matter what, now append the current NAL unit to the current packet
buildPacket[tid].appendNal(pesPayload, nalSize); bp.appendNal(pesPayload, nalSize);
}else{ }else{
buildPacket[tid].upgradeNal(pesPayload, nalSize); bp.upgradeNal(pesPayload, nalSize);
return; return;
} }
} }
@ -609,32 +624,26 @@ namespace TS{
buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true); buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true);
buildPacket[tid].setKeyFrame(false); buildPacket[tid].setKeyFrame(false);
} }
DTSC::Packet & bp = buildPacket[tid];
// Check if this is a keyframe // Check if this is a keyframe
parseNal(tid, pesPayload, nextPtr, isKeyFrame); parseNal(tid, pesPayload, nextPtr, isKeyFrame);
// If yes, set the keyframe flag // If yes, set the keyframe flag
if (isKeyFrame){ if (isKeyFrame){
buildPacket[tid].setKeyFrame(true); bp.setKeyFrame(true);
} }
// If the timestamp differs from current PES timestamp, send the previous packet out and // If the timestamp differs from current PES timestamp, send the previous packet out and
// fill a new one. // fill a new one.
if (buildPacket[tid].getTime() != timeStamp){ if (bp.getTime() != timeStamp){
// Add the finished DTSC packet to our output buffer // Add the finished DTSC packet to our output buffer
out.push_back(buildPacket[tid]); out.push_back(bp);
bp.null();
size_t size; bp.genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true);
char * tmp ; bp.setKeyFrame(false);
buildPacket[tid].getString("data", tmp, size);
// INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime())
// Create a new empty packet with the key frame bit set to true
buildPacket[tid].null();
buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true);
buildPacket[tid].setKeyFrame(false);
} }
// No matter what, now append the current NAL unit to the current packet // No matter what, now append the current NAL unit to the current packet
buildPacket[tid].appendNal(pesPayload, nalSize); bp.appendNal(pesPayload, nalSize);
} }
if (((nextPtr - pesPayload) + 3) >= realPayloadSize){return;}//end of the line if (((nextPtr - pesPayload) + 3) >= realPayloadSize){return;}//end of the line
@ -1011,6 +1020,8 @@ namespace TS{
void Stream::eraseTrack(size_t tid){ void Stream::eraseTrack(size_t tid){
tthread::lock_guard<tthread::recursive_mutex> guard(tMutex); tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
pesStreams.erase(tid); pesStreams.erase(tid);
psCacheTid = 0;
psCache = 0;
pesPositions.erase(tid); pesPositions.erase(tid);
outPackets.erase(tid); outPackets.erase(tid);
} }

View file

@ -71,6 +71,8 @@ namespace TS{
std::map<size_t, ProgramMappingTable> mappingTable; std::map<size_t, ProgramMappingTable> mappingTable;
std::map<size_t, std::deque<Packet> > pesStreams; std::map<size_t, std::deque<Packet> > pesStreams;
std::deque<Packet> * psCache; /// Used only for internal speed optimizes.
uint32_t psCacheTid; /// Used only for internal speed optimizes.
std::map<size_t, std::deque<uint64_t> > pesPositions; std::map<size_t, std::deque<uint64_t> > pesPositions;
std::map<size_t, std::deque<DTSC::Packet> > outPackets; std::map<size_t, std::deque<DTSC::Packet> > outPackets;
std::map<size_t, DTSC::Packet> buildPacket; std::map<size_t, DTSC::Packet> buildPacket;

View file

@ -775,9 +775,25 @@ namespace Mist{
return pidMapping[(((uint64_t)playlistId) << 32) + id]; return pidMapping[(((uint64_t)playlistId) << 32) + id];
} }
uint32_t inputHLS::getMappedTrackId(uint64_t id){return (pidMappingR[id] & 0xFFFFFFFFull);} uint32_t inputHLS::getMappedTrackId(uint64_t id){
static uint64_t lastIn = id;
static uint32_t lastOut = (pidMappingR[id] & 0xFFFFFFFFull);
if (lastIn != id){
lastIn = id;
lastOut = (pidMappingR[id] & 0xFFFFFFFFull);
}
return lastOut;
}
uint32_t inputHLS::getMappedTrackPlaylist(uint64_t id){return (pidMappingR[id] >> 32);} uint32_t inputHLS::getMappedTrackPlaylist(uint64_t id){
static uint64_t lastIn = id;
static uint32_t lastOut = (pidMappingR[id] >> 32);
if (lastIn != id){
lastIn = id;
lastOut = (pidMappingR[id] >> 32);
}
return lastOut;
}
/// Parses the main playlist, possibly containing variants. /// Parses the main playlist, possibly containing variants.
bool inputHLS::initPlaylist(const std::string &uri){ bool inputHLS::initPlaylist(const std::string &uri){