Added http://*.ts and http-ts://* URL support to TS input, siginificant TS parsing/input speed upgrades, various other related fixes and sundry

This commit is contained in:
Thulinma 2017-07-25 11:52:35 +02:00
parent e608ef69fd
commit 20b3010e75
4 changed files with 297 additions and 315 deletions

View file

@ -10,7 +10,7 @@
namespace TS{ namespace TS{
void ADTSRemainder::setRemainder(const aac::adts &p, const void *source, const uint32_t avail, void ADTSRemainder::setRemainder(const aac::adts &p, const void *source, const uint32_t avail,
const uint32_t bPos){ const uint64_t bPos){
if (!p.getCompleteSize()){return;} if (!p.getCompleteSize()){return;}
if (max < p.getCompleteSize()){ if (max < p.getCompleteSize()){
@ -62,26 +62,16 @@ namespace TS{
uint32_t ADTSRemainder::getLength(){return len;} uint32_t ADTSRemainder::getLength(){return len;}
uint32_t ADTSRemainder::getBpos(){return bpos;} uint64_t ADTSRemainder::getBpos(){return bpos;}
uint32_t ADTSRemainder::getTodo(){return len - now;} uint32_t ADTSRemainder::getTodo(){return len - now;}
char *ADTSRemainder::getData(){return data;} char *ADTSRemainder::getData(){return data;}
Stream::Stream(bool _threaded){ Stream::Stream(bool _threaded){
threaded = _threaded; threaded = _threaded;
if (threaded){
globalSem.open("MstTSInputLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!globalSem){
FAIL_MSG("Creating semaphore failed: %s", strerror(errno));
threaded = false;
DEBUG_MSG(DLVL_FAIL, "Creating semaphore failed: %s", strerror(errno));
return;
}
}
} }
Stream::~Stream(){ Stream::~Stream(){
if (threaded){globalSem.unlink();}
} }
void Stream::parse(char *newPack, unsigned long long bytePos){ void Stream::parse(char *newPack, unsigned long long bytePos){
@ -91,17 +81,17 @@ namespace TS{
} }
void Stream::partialClear(){ void Stream::partialClear(){
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
pesStreams.clear(); pesStreams.clear();
pesPositions.clear(); pesPositions.clear();
outPackets.clear(); outPackets.clear();
buildPacket.clear(); buildPacket.clear();
if (threaded){globalSem.post();} seenUnitStart.clear();
} }
void Stream::clear(){ void Stream::clear(){
tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
partialClear(); partialClear();
if (threaded){globalSem.wait();}
pidToCodec.clear(); pidToCodec.clear();
adtsInfo.clear(); adtsInfo.clear();
spsInfo.clear(); spsInfo.clear();
@ -115,10 +105,10 @@ namespace TS{
pmtTracks.clear(); pmtTracks.clear();
remainders.clear(); remainders.clear();
associationTable = ProgramAssociationTable(); associationTable = ProgramAssociationTable();
if (threaded){globalSem.post();}
} }
void Stream::finish(){ void Stream::finish(){
tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
if (!pesStreams.size()){return;} if (!pesStreams.size()){return;}
for (std::map<unsigned long, std::deque<Packet> >::const_iterator i = pesStreams.begin(); for (std::map<unsigned long, std::deque<Packet> >::const_iterator i = pesStreams.begin();
@ -134,53 +124,47 @@ namespace TS{
} }
void Stream::add(Packet &newPack, unsigned long long bytePos){ void Stream::add(Packet &newPack, unsigned long long bytePos){
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
int tid = newPack.getPID(); int tid = newPack.getPID();
bool unitStart = newPack.getUnitStart();
std::deque<Packet> & PS = pesStreams[tid];
if ((pidToCodec.count(tid) || tid == 0 || newPack.isPMT()) && if ((pidToCodec.count(tid) || tid == 0 || newPack.isPMT()) &&
(pesStreams[tid].size() || newPack.getUnitStart())){ (unitStart || PS.size())){
pesStreams[tid].push_back(newPack); PS.push_back(newPack);
pesPositions[tid].push_back(bytePos); if (unitStart){
pesPositions[tid].push_back(bytePos);
++(seenUnitStart[tid]);
}
} }
if (threaded){globalSem.post();}
} }
bool Stream::isDataTrack(unsigned long tid){ bool Stream::isDataTrack(unsigned long tid){
if (tid == 0){return false;} if (tid == 0){return false;}
if (threaded){globalSem.wait();} {
bool result = !pmtTracks.count(tid); tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
if (threaded){globalSem.post();} return !pmtTracks.count(tid);
return result; }
} }
void Stream::parse(unsigned long tid){ void Stream::parse(unsigned long tid){
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
if (!pesStreams.count(tid) || pesStreams[tid].size() == 0){ if (!pesStreams.count(tid) || pesStreams[tid].size() == 0){
if (threaded){globalSem.post();}
return; return;
} }
std::deque<Packet> &trackPackets = pesStreams[tid]; std::deque<Packet> &trackPackets = pesStreams[tid];
if (threaded){globalSem.post();}
// Handle PAT packets // Handle PAT packets
if (tid == 0){ if (tid == 0){
///\todo Keep track of updates in PAT instead of keeping only the last PAT as a reference ///\todo Keep track of updates in PAT instead of keeping only the last PAT as a reference
if (threaded){globalSem.wait();}
associationTable = trackPackets.back(); associationTable = trackPackets.back();
associationTable.parsePIDs(); associationTable.parsePIDs();
lastPAT = Util::bootSecs(); lastPAT = Util::bootSecs();
if (threaded){globalSem.post();}
int pmtCount = associationTable.getProgramCount(); int pmtCount = associationTable.getProgramCount();
for (int i = 0; i < pmtCount; i++){pmtTracks.insert(associationTable.getProgramPID(i));} for (int i = 0; i < pmtCount; i++){pmtTracks.insert(associationTable.getProgramPID(i));}
if (threaded){globalSem.wait();}
pesStreams.erase(0); pesStreams.erase(0);
pesPositions.erase(0);
if (threaded){globalSem.post();}
return; return;
} }
@ -191,10 +175,8 @@ namespace TS{
if (pmtTracks.count(tid)){ if (pmtTracks.count(tid)){
///\todo Keep track of updates in PMT instead of keeping only the last PMT per program as a ///\todo Keep track of updates in PMT instead of keeping only the last PMT per program as a
/// reference /// reference
if (threaded){globalSem.wait();}
mappingTable[tid] = trackPackets.back(); mappingTable[tid] = trackPackets.back();
lastPMT[tid] = Util::bootSecs(); lastPMT[tid] = Util::bootSecs();
if (threaded){globalSem.post();}
ProgramMappingEntry entry = mappingTable[tid].getEntry(0); ProgramMappingEntry entry = mappingTable[tid].getEntry(0);
while (entry){ while (entry){
unsigned long pid = entry.getElementaryPid(); unsigned long pid = entry.getElementaryPid();
@ -215,42 +197,13 @@ namespace TS{
entry.advance(); entry.advance();
} }
if (threaded){globalSem.wait();}
pesStreams.erase(tid); pesStreams.erase(tid);
pesPositions.erase(tid);
if (threaded){globalSem.post();}
return; return;
} }
if(seenUnitStart[tid] > 1) {
parsePES(tid);
if (threaded){globalSem.wait();}
bool parsePes = false;
int packNum = 1;
// Usually we append a packet at a time, so the start code is expected to show up at the end.
std::deque<Packet> &inStream = pesStreams[tid];
if(inStream.size() > 1) {
if (inStream.rbegin()->getUnitStart()){
parsePes = true;
}else{
// But, sometimes (e.g. live) we do multiples, and need to check all of it...
std::deque<Packet>::iterator lastPack = inStream.end();
std::deque<Packet>::iterator curPack = inStream.begin();
curPack++;
while (curPack != lastPack && !curPack->getUnitStart()){
curPack++;
packNum++;
}
if (curPack != lastPack){parsePes = true;}
}
} }
if (threaded){globalSem.post();}
if (parsePes){parsePES(tid);}
} }
void Stream::parse(Packet &newPack, unsigned long long bytePos){ void Stream::parse(Packet &newPack, unsigned long long bytePos){
@ -261,11 +214,8 @@ namespace TS{
} }
bool Stream::hasPacketOnEachTrack() const{ bool Stream::hasPacketOnEachTrack() const{
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
if (!pidToCodec.size()){ if (!pidToCodec.size()){
if (threaded){globalSem.post();}
// INFO_MSG("no packet on each track 1, pidtocodec.size: %d, outpacket.size: %d", // INFO_MSG("no packet on each track 1, pidtocodec.size: %d, outpacket.size: %d",
// pidToCodec.size(), outPackets.size()); // pidToCodec.size(), outPackets.size());
return false; return false;
@ -287,42 +237,27 @@ namespace TS{
} }
} }
if (threaded){globalSem.post();}
return (!missing || (missing != pidToCodec.size() && lastTime - firstTime > 2000)); return (!missing || (missing != pidToCodec.size() && lastTime - firstTime > 2000));
} }
bool Stream::hasPacket(unsigned long tid) const{ bool Stream::hasPacket(unsigned long tid) const{
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
std::map<unsigned long, std::deque<Packet> >::const_iterator pesIt = pesStreams.find(tid); std::map<unsigned long, std::deque<Packet> >::const_iterator pesIt = pesStreams.find(tid);
if (pesIt == pesStreams.end()){ if (pesIt == pesStreams.end()){
if (threaded){globalSem.post();}
return false; return false;
} }
if (outPackets.count(tid) && outPackets.at(tid).size()){ if (outPackets.count(tid) && outPackets.at(tid).size()){
if (threaded){globalSem.post();}
return true; return true;
} }
const std::deque<Packet> & thisStream = pesIt->second; if (pidToCodec.count(tid) && seenUnitStart.count(tid) && seenUnitStart.at(tid) > 1){
std::deque<Packet>::const_iterator curPack = thisStream.begin();
std::deque<Packet>::const_iterator endPack = thisStream.end();
if (curPack != endPack){curPack++;}
while (curPack != endPack && !curPack->getUnitStart()){curPack++;}
if (curPack != endPack){
if (threaded){globalSem.post();}
return true; return true;
} }
if (threaded){globalSem.post();}
return false; return false;
} }
bool Stream::hasPacket() const{ bool Stream::hasPacket() const{
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
if (!pesStreams.size()){ if (!pesStreams.size()){
if (threaded){globalSem.post();}
return false; return false;
} }
@ -331,27 +266,18 @@ namespace TS{
outPackets.begin(); outPackets.begin();
i != outPackets.end(); i++){ i != outPackets.end(); i++){
if (i->second.size()){ if (i->second.size()){
if (threaded){globalSem.post();}
return true; return true;
} }
} }
} }
for (std::map<unsigned long, std::deque<Packet> >::const_iterator i = pesStreams.begin(); for (std::map<unsigned long, uint32_t>::const_iterator i = seenUnitStart.begin();
i != pesStreams.end(); i++){ i != seenUnitStart.end(); i++){
std::deque<Packet>::const_iterator curPack = i->second.begin(); if (pidToCodec.count(i->first) && i->second > 1){
if (curPack != i->second.end()){curPack++;}
while (curPack != i->second.end() && !curPack->getUnitStart()){curPack++;}
if (curPack != i->second.end()){
if (threaded){globalSem.post();}
return true; return true;
} }
} }
if (threaded){globalSem.post();}
return false; return false;
} }
@ -370,11 +296,9 @@ namespace TS{
if (!pidToCodec.count(tid)){ if (!pidToCodec.count(tid)){
return; // skip unknown codecs return; // skip unknown codecs
} }
if (threaded){globalSem.wait();}
std::deque<Packet> &inStream = pesStreams[tid]; std::deque<Packet> &inStream = pesStreams[tid];
std::deque<unsigned long long> &inPositions = pesPositions[tid];
if (inStream.size() <= 1){ if (inStream.size() <= 1){
if (threaded){globalSem.post();} INFO_MSG("No PES packets to parse");
return; return;
} }
// Find number of packets before unit Start // Find number of packets before unit Start
@ -382,10 +306,10 @@ namespace TS{
std::deque<Packet>::iterator curPack = inStream.begin(); std::deque<Packet>::iterator curPack = inStream.begin();
if (inStream.rbegin()->getUnitStart()){ if (seenUnitStart[tid] == 2 && inStream.begin()->getUnitStart() && inStream.rbegin()->getUnitStart()){
packNum = inStream.size() - 1; packNum = inStream.size() - 1;
curPack = inStream.end(); curPack = inStream.end();
curPack --; curPack--;
}else{ }else{
curPack++; curPack++;
while (curPack != inStream.end() && !curPack->getUnitStart()){ while (curPack != inStream.end() && !curPack->getUnitStart()){
@ -394,18 +318,28 @@ namespace TS{
} }
} }
if (!finished && curPack == inStream.end()){ if (!finished && curPack == inStream.end()){
if (threaded){globalSem.post();} INFO_MSG("No PES packets to parse (%lu)", seenUnitStart[tid]);
return; return;
} }
unsigned long long bPos = inPositions.front(); // We now know we're deleting 1 UnitStart, so we can pop the pesPositions and lower the seenUnitStart counter.
--(seenUnitStart[tid]);
std::deque<unsigned long long> &inPositions = pesPositions[tid];
uint64_t bPos = inPositions.front();
inPositions.pop_front();
// Create a buffer for the current PES, and remove it from the pesStreams buffer. // Create a buffer for the current PES, and remove it from the pesStreams buffer.
int paySize = 0; int paySize = 0;
// Loop over the packets we need, and calculate the total payload size // Loop over the packets we need, and calculate the total payload size
curPack = inStream.begin(); curPack = inStream.begin();
int lastCtr = curPack->getContinuityCounter() - 1;
for (int i = 0; i < packNum; i++){ for (int i = 0; i < packNum; i++){
if (curPack->getContinuityCounter() == lastCtr){
curPack++;
continue;
}
lastCtr = curPack->getContinuityCounter();
paySize += curPack->getPayloadLength(); paySize += curPack->getPayloadLength();
curPack++; curPack++;
} }
@ -420,14 +354,14 @@ namespace TS{
paySize = 0; paySize = 0;
curPack = inStream.begin(); curPack = inStream.begin();
int lastCtr = curPack->getContinuityCounter() - 1; lastCtr = curPack->getContinuityCounter() - 1;
for (int i = 0; i < packNum; i++){ for (int i = 0; i < packNum; i++){
if (curPack->getContinuityCounter() == lastCtr){ if (curPack->getContinuityCounter() == lastCtr){
curPack++; curPack++;
continue; continue;
} }
if (curPack->getContinuityCounter() - lastCtr != 1 && curPack->getContinuityCounter()){ if (curPack->getContinuityCounter() - lastCtr != 1 && curPack->getContinuityCounter()){
INFO_MSG("Parsing a pes on track %d, missed %d packets", tid, INFO_MSG("Parsing PES on track %d, missed %d packets", tid,
curPack->getContinuityCounter() - lastCtr - 1); curPack->getContinuityCounter() - lastCtr - 1);
} }
lastCtr = curPack->getContinuityCounter(); lastCtr = curPack->getContinuityCounter();
@ -436,8 +370,6 @@ namespace TS{
curPack++; curPack++;
} }
inStream.erase(inStream.begin(), curPack); inStream.erase(inStream.begin(), curPack);
inPositions.erase(inPositions.begin(), inPositions.begin() + packNum);
if (threaded){globalSem.post();}
// we now have the whole PES packet in char* payload, with a total size of paySize (including // we now have the whole PES packet in char* payload, with a total size of paySize (including
// headers) // headers)
@ -511,7 +443,7 @@ namespace TS{
} }
if (paySize - offset - pesOffset < realPayloadSize){ if (paySize - offset - pesOffset < realPayloadSize){
WARN_MSG("Packet loss detected, glitches will occur"); WARN_MSG("Packet loss detected (%lu != %lu), glitches will occur", (uint32_t)(paySize-offset-pesOffset), (uint32_t)realPayloadSize);
realPayloadSize = paySize - offset - pesOffset; realPayloadSize = paySize - offset - pesOffset;
} }
@ -536,7 +468,6 @@ namespace TS{
// Parse all the ADTS packets // Parse all the ADTS packets
unsigned long offsetInPes = 0; unsigned long offsetInPes = 0;
uint64_t msRead = 0; uint64_t msRead = 0;
if (threaded){globalSem.wait();}
if (remainders.count(tid) && remainders[tid].getLength()){ if (remainders.count(tid) && remainders[tid].getLength()){
offsetInPes = offsetInPes =
@ -584,15 +515,12 @@ namespace TS{
} }
} }
} }
if (threaded){globalSem.post();}
} }
if (thisCodec == ID3 || thisCodec == AC3){ if (thisCodec == ID3 || thisCodec == AC3){
if (threaded){globalSem.wait();}
out.push_back(DTSC::Packet()); out.push_back(DTSC::Packet());
out.back().genericFill(timeStamp, timeOffset, tid, pesPayload, realPayloadSize, out.back().genericFill(timeStamp, timeOffset, tid, pesPayload, realPayloadSize,
bPos, 0); bPos, 0);
if (threaded){globalSem.post();}
} }
if (thisCodec == H264 || thisCodec == H265){ if (thisCodec == H264 || thisCodec == H265){
@ -616,9 +544,7 @@ namespace TS{
// fill a new one. // fill a new one.
if (buildPacket[tid].getTime() != timeStamp){ if (buildPacket[tid].getTime() != timeStamp){
// Add the finished DTSC packet to our output buffer // Add the finished DTSC packet to our output buffer
if (threaded){globalSem.wait();}
out.push_back(buildPacket[tid]); out.push_back(buildPacket[tid]);
if (threaded){globalSem.post();}
uint32_t size; uint32_t size;
char * tmp ; char * tmp ;
@ -626,7 +552,6 @@ namespace TS{
INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime()) INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime())
if (threaded){globalSem.post();}
// Create a new empty packet with the key frame bit set to true // Create a new empty packet with the key frame bit set to true
buildPacket[tid].null(); buildPacket[tid].null();
buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true); buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true);
@ -676,7 +601,6 @@ namespace TS{
// fill a new one. // fill a new one.
if (buildPacket[tid].getTime() != timeStamp){ if (buildPacket[tid].getTime() != timeStamp){
// Add the finished DTSC packet to our output buffer // Add the finished DTSC packet to our output buffer
if (threaded){globalSem.wait();}
out.push_back(buildPacket[tid]); out.push_back(buildPacket[tid]);
uint32_t size; uint32_t size;
@ -684,7 +608,6 @@ namespace TS{
buildPacket[tid].getString("data", tmp, size); buildPacket[tid].getString("data", tmp, size);
// INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime()) // INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime())
if (threaded){globalSem.post();}
// Create a new empty packet with the key frame bit set to true // Create a new empty packet with the key frame bit set to true
buildPacket[tid].null(); buildPacket[tid].null();
buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true); buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true);
@ -705,34 +628,30 @@ namespace TS{
} }
void Stream::getPacket(unsigned long tid, DTSC::Packet &pack){ void Stream::getPacket(unsigned long tid, DTSC::Packet &pack){
tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
pack.null(); pack.null();
if (!hasPacket(tid)){ if (!hasPacket(tid)){
ERROR_MSG("Trying to obtain a packet on track %lu, but no full packet is available", tid); ERROR_MSG("Trying to obtain a packet on track %lu, but no full packet is available", tid);
return; return;
} }
if (threaded){globalSem.wait();}
bool packetReady = outPackets.count(tid) && outPackets[tid].size(); bool packetReady = outPackets.count(tid) && outPackets[tid].size();
if (threaded){globalSem.post();}
if (!packetReady){parse(tid);} if (!packetReady){
parse(tid);
if (threaded){globalSem.wait();} packetReady = outPackets.count(tid) && outPackets[tid].size();
packetReady = outPackets.count(tid) && outPackets[tid].size(); }
if (threaded){globalSem.post();}
if (!packetReady){ if (!packetReady){
ERROR_MSG("Track %lu: PES without valid packets?", tid); ERROR_MSG("Track %lu: PES without valid packets?", tid);
return; return;
} }
if (threaded){globalSem.wait();}
pack = outPackets[tid].front(); pack = outPackets[tid].front();
outPackets[tid].pop_front(); outPackets[tid].pop_front();
if (!outPackets[tid].size()){outPackets.erase(tid);} if (!outPackets[tid].size()){outPackets.erase(tid);}
if (threaded){globalSem.post();}
} }
void Stream::parseNal(uint32_t tid, const char *pesPayload, const char *nextPtr, void Stream::parseNal(uint32_t tid, const char *pesPayload, const char *nextPtr,
@ -773,15 +692,11 @@ namespace TS{
break; break;
} }
case 0x07:{ case 0x07:{
if (threaded){globalSem.wait();}
spsInfo[tid] = std::string(pesPayload, (nextPtr - pesPayload)); spsInfo[tid] = std::string(pesPayload, (nextPtr - pesPayload));
if (threaded){globalSem.post();}
break; break;
} }
case 0x08:{ case 0x08:{
if (threaded){globalSem.wait();}
ppsInfo[tid] = std::string(pesPayload, (nextPtr - pesPayload)); ppsInfo[tid] = std::string(pesPayload, (nextPtr - pesPayload));
if (threaded){globalSem.post();}
break; break;
} }
default: break; default: break;
@ -808,9 +723,7 @@ namespace TS{
case 32: case 32:
case 33: case 33:
case 34:{ case 34:{
if (threaded){globalSem.wait();}
hevcInfo[tid].addUnit((char *)pesPayload); // may i convert to (char *)? hevcInfo[tid].addUnit((char *)pesPayload); // may i convert to (char *)?
if (threaded){globalSem.post();}
break; break;
} }
default: break; default: break;
@ -819,7 +732,7 @@ namespace TS{
} }
void Stream::getEarliestPacket(DTSC::Packet &pack){ void Stream::getEarliestPacket(DTSC::Packet &pack){
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
pack.null(); pack.null();
unsigned long packTime = 0xFFFFFFFFull; unsigned long packTime = 0xFFFFFFFFull;
@ -832,13 +745,12 @@ namespace TS{
packTime = it->second.front().getTime(); packTime = it->second.front().getTime();
} }
} }
if (threaded){globalSem.post();}
if (packTrack){getPacket(packTrack, pack);} if (packTrack){getPacket(packTrack, pack);}
} }
void Stream::initializeMetadata(DTSC::Meta &meta, unsigned long tid, unsigned long mappingId){ void Stream::initializeMetadata(DTSC::Meta &meta, unsigned long tid, unsigned long mappingId){
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
unsigned long mId = mappingId; unsigned long mId = mappingId;
@ -944,11 +856,10 @@ namespace TS{
MEDIUM_MSG("Initialized track %lu as %s %s", it->first, meta.tracks[mId].codec.c_str(), MEDIUM_MSG("Initialized track %lu as %s %s", it->first, meta.tracks[mId].codec.c_str(),
meta.tracks[mId].type.c_str()); meta.tracks[mId].type.c_str());
} }
if (threaded){globalSem.post();}
} }
std::set<unsigned long> Stream::getActiveTracks(){ std::set<unsigned long> Stream::getActiveTracks(){
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
std::set<unsigned long> result; std::set<unsigned long> result;
// Track 0 is always active // Track 0 is always active
result.insert(0); result.insert(0);
@ -978,16 +889,14 @@ namespace TS{
} }
} }
} }
if (threaded){globalSem.post();}
return result; return result;
} }
void Stream::eraseTrack(unsigned long tid){ void Stream::eraseTrack(unsigned long tid){
if (threaded){globalSem.wait();} tthread::lock_guard<tthread::recursive_mutex> guard(tMutex);
pesStreams.erase(tid); pesStreams.erase(tid);
pesPositions.erase(tid); pesPositions.erase(tid);
outPackets.erase(tid); outPackets.erase(tid);
if (threaded){globalSem.post();}
} }
} }

View file

@ -1,6 +1,7 @@
#include "adts.h" #include "adts.h"
#include "h265.h" #include "h265.h"
#include "ts_packet.h" #include "ts_packet.h"
#include "tinythread.h"
#include <deque> #include <deque>
#include <map> #include <map>
#include <set> #include <set>
@ -16,16 +17,16 @@ namespace TS{
uint32_t max; uint32_t max;
uint32_t now; uint32_t now;
uint32_t len; uint32_t len;
uint32_t bpos; uint64_t bpos;
public: public:
void setRemainder(const aac::adts &p, const void *source, const uint32_t avail, void setRemainder(const aac::adts &p, const void *source, const uint32_t avail,
const uint32_t bPos); const uint64_t bPos);
ADTSRemainder(); ADTSRemainder();
~ADTSRemainder(); ~ADTSRemainder();
uint32_t getLength(); uint32_t getLength();
uint32_t getBpos(); uint64_t getBpos();
uint32_t getTodo(); uint32_t getTodo();
char *getData(); char *getData();
@ -78,7 +79,8 @@ namespace TS{
std::map<unsigned long, h265::initData> hevcInfo; std::map<unsigned long, h265::initData> hevcInfo;
std::map<unsigned long, std::string> metaInit; std::map<unsigned long, std::string> metaInit;
std::map<unsigned long, std::string> descriptors; std::map<unsigned long, std::string> descriptors;
mutable IPC::semaphore globalSem; std::map<unsigned long, uint32_t> seenUnitStart;
mutable tthread::recursive_mutex tMutex;
bool threaded; bool threaded;

View file

@ -1,3 +1,4 @@
#include <mist/util.h>
#include <iostream> #include <iostream>
#include <iomanip> #include <iomanip>
#include <fstream> #include <fstream>
@ -13,14 +14,14 @@
#include <mist/timing.h> #include <mist/timing.h>
#include <mist/mp4_generic.h> #include <mist/mp4_generic.h>
#include <mist/http_parser.h> #include <mist/http_parser.h>
#include <mist/downloader.h>
#include "input_ts.h" #include "input_ts.h"
#include <mist/tinythread.h> #include <mist/tinythread.h>
#include <mist/procs.h> #include <mist/procs.h>
#include <sys/stat.h> #include <sys/stat.h>
#define SEM_TS_CLAIM "/MstTSIN%s" tthread::mutex threadClaimMutex;
std::string globalStreamName; std::string globalStreamName;
TS::Stream liveStream(true); TS::Stream liveStream(true);
Util::Config * cfgPointer = NULL; Util::Config * cfgPointer = NULL;
@ -32,24 +33,14 @@ std::set<unsigned long> claimableThreads;
void parseThread(void * ignored) { void parseThread(void * ignored) {
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
int tid = -1; int tid = -1;
lock.wait(); {
if (claimableThreads.size()) { tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
tid = *claimableThreads.begin(); if (claimableThreads.size()) {
claimableThreads.erase(claimableThreads.begin()); tid = *claimableThreads.begin();
} claimableThreads.erase(claimableThreads.begin());
lock.post(); }
if (tid == -1) { if (tid == -1) {
return;
}
if (liveStream.isDataTrack(tid)){
if (!Util::startInput(globalStreamName, "push://INTERNAL_ONLY:"+cfgPointer->getString("input"), true, true)) {//manually override stream url to start the buffer
FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str());
return; return;
} }
} }
@ -59,17 +50,28 @@ void parseThread(void * ignored) {
DTSC::Meta myMeta; DTSC::Meta myMeta;
if (liveStream.isDataTrack(tid)){ if (liveStream.isDataTrack(tid)){
if (!Util::streamAlive(globalStreamName) && !Util::startInput(globalStreamName, "push://INTERNAL_ONLY:"+cfgPointer->getString("input"), true, true)) {
FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str());
return;
}
char userPageName[NAME_BUFFER_SIZE]; char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, globalStreamName.c_str()); snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, globalStreamName.c_str());
myProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); myProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
myProxy.userClient.countAsViewer = false; myProxy.userClient.countAsViewer = false;
} }
threadTimer[tid] = Util::bootSecs(); threadTimer[tid] = Util::bootSecs();
while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active && (!liveStream.isDataTrack(tid) || myProxy.userClient.isAlive())) { while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active && (!liveStream.isDataTrack(tid) || myProxy.userClient.isAlive())) {
liveStream.parse(tid); liveStream.parse(tid);
if (liveStream.hasPacket(tid)){ if (!liveStream.hasPacket(tid)){
if (liveStream.isDataTrack(tid)){
myProxy.userClient.keepAlive();
}
Util::sleep(100);
continue;
}
while (liveStream.hasPacket(tid)){
liveStream.initializeMetadata(myMeta, tid); liveStream.initializeMetadata(myMeta, tid);
DTSC::Packet pack; DTSC::Packet pack;
liveStream.getPacket(tid, pack); liveStream.getPacket(tid, pack);
@ -77,19 +79,12 @@ void parseThread(void * ignored) {
myProxy.continueNegotiate(tid, myMeta, true); myProxy.continueNegotiate(tid, myMeta, true);
myProxy.bufferLivePacket(pack, myMeta); myProxy.bufferLivePacket(pack, myMeta);
} }
lock.wait();
threadTimer[tid] = Util::bootSecs();
lock.post();
} }
if (!liveStream.hasPacket(tid)){ {
if (liveStream.isDataTrack(tid)){ tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
myProxy.userClient.keepAlive(); threadTimer[tid] = Util::bootSecs();
}
Util::sleep(100);
} }
} }
lock.wait();
std::string reason = "unknown reason"; std::string reason = "unknown reason";
if (!(Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT)){reason = "thread timeout";} if (!(Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT)){reason = "thread timeout";}
if (!cfgPointer->is_active){reason = "input shutting down";} if (!cfgPointer->is_active){reason = "input shutting down";}
@ -97,9 +92,11 @@ void parseThread(void * ignored) {
reason = "buffer disconnect"; reason = "buffer disconnect";
cfgPointer->is_active = false; cfgPointer->is_active = false;
} }
INFO_MSG("Shutting down thread because %s", reason.c_str()); INFO_MSG("Shutting down thread for %d because %s", tid, reason.c_str());
threadTimer.erase(tid); {
lock.post(); tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
threadTimer.erase(tid);
}
liveStream.eraseTrack(tid); liveStream.eraseTrack(tid);
myProxy.userClient.finish(); myProxy.userClient.finish();
} }
@ -115,10 +112,14 @@ namespace Mist {
capa["source_match"].append("stream://*.ts"); capa["source_match"].append("stream://*.ts");
capa["source_match"].append("tsudp://*"); capa["source_match"].append("tsudp://*");
capa["source_match"].append("ts-exec:*"); capa["source_match"].append("ts-exec:*");
capa["source_match"].append("http://*.ts");
capa["source_match"].append("http-ts://*");
//These can/may be set to always-on mode //These can/may be set to always-on mode
capa["always_match"].append("stream://*.ts"); capa["always_match"].append("stream://*.ts");
capa["always_match"].append("tsudp://*"); capa["always_match"].append("tsudp://*");
capa["always_match"].append("ts-exec:*"); capa["always_match"].append("ts-exec:*");
capa["always_match"].append("http://*.ts");
capa["always_match"].append("http-ts://*");
capa["priority"] = 9ll; capa["priority"] = 9ll;
capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("HEVC"); capa["codecs"][0u][0u].append("HEVC");
@ -132,15 +133,13 @@ namespace Mist {
if (inFile) { if (inFile) {
fclose(inFile); fclose(inFile);
} }
if (tcpCon){
tcpCon.close();
}
if (!standAlone){ if (!standAlone){
char semName[NAME_BUFFER_SIZE]; tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
lock.wait();
threadTimer.clear(); threadTimer.clear();
claimableThreads.clear(); claimableThreads.clear();
lock.post();
lock.unlink();
} }
} }
@ -148,51 +147,62 @@ namespace Mist {
bool inputTS::preRun() { bool inputTS::preRun() {
const std::string & inpt = config->getString("input"); const std::string & inpt = config->getString("input");
//streamed standard input //streamed standard input
if (inpt == "-" || inpt.substr(0, 8) == "ts-exec:") { if (inpt == "-") {
standAlone = false; standAlone = false;
if (inpt.size() > 1){ tcpCon = Socket::Connection(fileno(stdout), fileno(stdin));
std::string input = inpt.substr(8); return true;
char *args[128]; }
uint8_t argCnt = 0; if (inpt.substr(0, 7) == "http://" || inpt.substr(0, 10) == "http-ts://"){
char *startCh = 0; standAlone = false;
for (char *i = (char*)input.c_str(); i <= input.data() + input.size(); ++i){ HTTP::URL url(inpt);
if (!*i){ url.protocol = "http";
if (startCh){args[argCnt++] = startCh;} HTTP::Downloader DL;
break; DL.getHTTP().headerOnly = true;
} if (!DL.get(url)){
if (*i == ' '){ return false;
if (startCh){
args[argCnt++] = startCh;
startCh = 0;
*i = 0;
}
}else{
if (!startCh){startCh = i;}
}
}
args[argCnt] = 0;
int fin = -1, fout = -1, ferr = -1;
inputProcess = Util::Procs::StartPiped(args, &fin, &fout, &ferr);
inFile = fdopen(fout, "r");
}else{
inFile = stdin;
} }
tcpCon = DL.getSocket();
return true;
}
if (inpt.substr(0, 8) == "ts-exec:") {
standAlone = false;
std::string input = inpt.substr(8);
char *args[128];
uint8_t argCnt = 0;
char *startCh = 0;
for (char *i = (char*)input.c_str(); i <= input.data() + input.size(); ++i){
if (!*i){
if (startCh){args[argCnt++] = startCh;}
break;
}
if (*i == ' '){
if (startCh){
args[argCnt++] = startCh;
startCh = 0;
*i = 0;
}
}else{
if (!startCh){startCh = i;}
}
}
args[argCnt] = 0;
int fin = -1, fout = -1, ferr = -1;
inputProcess = Util::Procs::StartPiped(args, &fin, &fout, &ferr);
tcpCon = Socket::Connection(-1, fout);
return true; return true;
} }
//streamed file //streamed file
if (inpt.substr(0,9) == "stream://"){ if (inpt.substr(0,9) == "stream://"){
inFile = fopen(inpt.c_str()+9, "r"); inFile = fopen(inpt.c_str()+9, "r");
tcpCon = Socket::Connection(-1, fileno(inFile));
standAlone = false; standAlone = false;
return inFile; return inFile;
} }
//UDP input (tsudp://[host:]port[/iface[,iface[,...]]]) //UDP input (tsudp://[host:]port[/iface[,iface[,...]]])
if (inpt.substr(0, 8) == "tsudp://"){ if (inpt.substr(0, 8) == "tsudp://"){
HTTP::URL input_url(inpt);
standAlone = false; standAlone = false;
udpCon.setBlocking(false); return true;
udpCon.bind(input_url.getPort(), input_url.host, input_url.path);
return udpCon.getSock() != -1;
} }
//plain VoD file //plain VoD file
inFile = fopen(inpt.c_str(), "r"); inFile = fopen(inpt.c_str(), "r");
@ -232,33 +242,33 @@ namespace Mist {
bool inputTS::readHeader() { bool inputTS::readHeader() {
if (!inFile){return false;} if (!inFile){return false;}
TS::Packet packet;//to analyse and extract data TS::Packet packet;//to analyse and extract data
DTSC::Packet headerPack;
fseek(inFile, 0, SEEK_SET);//seek to beginning fseek(inFile, 0, SEEK_SET);//seek to beginning
long long int lastBpos = 0; uint64_t lastBpos = 0;
while (packet.FromFile(inFile) && !feof(inFile)) { while (packet.FromFile(inFile) && !feof(inFile)) {
tsStream.parse(packet, lastBpos); tsStream.parse(packet, lastBpos);
lastBpos = ftell(inFile); lastBpos = Util::ftell(inFile);
while (tsStream.hasPacketOnEachTrack()) { if (packet.getUnitStart()){
DTSC::Packet headerPack; while (tsStream.hasPacketOnEachTrack()) {
tsStream.getEarliestPacket(headerPack); tsStream.getEarliestPacket(headerPack);
if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) { if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) {
tsStream.initializeMetadata(myMeta, headerPack.getTrackId()); tsStream.initializeMetadata(myMeta, headerPack.getTrackId());
}
myMeta.update(headerPack);
} }
myMeta.update(headerPack);
} }
} }
tsStream.finish();
DTSC::Packet headerPack; INFO_MSG("Reached %s at %llu bytes", feof(inFile)?"EOF":"error", lastBpos);
tsStream.getEarliestPacket(headerPack); while (tsStream.hasPacket()) {
tsStream.getEarliestPacket(headerPack);
while (headerPack) {
if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) { if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) {
tsStream.initializeMetadata(myMeta, headerPack.getTrackId()); tsStream.initializeMetadata(myMeta, headerPack.getTrackId());
} }
myMeta.update(headerPack); myMeta.update(headerPack);
tsStream.getEarliestPacket(headerPack);
} }
fseek(inFile, 0, SEEK_SET); fseek(inFile, 0, SEEK_SET);
myMeta.toFile(config->getString("input") + ".dtsh"); myMeta.toFile(config->getString("input") + ".dtsh");
return true; return true;
@ -273,12 +283,13 @@ namespace Mist {
thisPacket.null(); thisPacket.null();
bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
while (!hasPacket && !feof(inFile) && (inputProcess == 0 || Util::Procs::childRunning(inputProcess)) && config->is_active) { while (!hasPacket && !feof(inFile) && (inputProcess == 0 || Util::Procs::childRunning(inputProcess)) && config->is_active) {
unsigned int bPos = ftell(inFile);
tsBuf.FromFile(inFile); tsBuf.FromFile(inFile);
if (selectedTracks.count(tsBuf.getPID())) { if (selectedTracks.count(tsBuf.getPID())) {
tsStream.parse(tsBuf, bPos); tsStream.parse(tsBuf, 0);//bPos == 0
if (tsBuf.getUnitStart()){
hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
}
} }
hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
} }
if (!hasPacket) { if (!hasPacket) {
return; return;
@ -300,7 +311,7 @@ namespace Mist {
void inputTS::readPMT() { void inputTS::readPMT() {
//save current file position //save current file position
int bpos = ftell(inFile); uint64_t bpos = Util::ftell(inFile);
if (fseek(inFile, 0, SEEK_SET)) { if (fseek(inFile, 0, SEEK_SET)) {
FAIL_MSG("Seek to 0 failed"); FAIL_MSG("Seek to 0 failed");
return; return;
@ -315,7 +326,7 @@ namespace Mist {
tsStream.partialClear(); tsStream.partialClear();
//Restore original file position //Restore original file position
if (fseek(inFile, bpos, SEEK_SET)) { if (Util::fseek(inFile, bpos, SEEK_SET)) {
return; return;
} }
} }
@ -324,7 +335,7 @@ namespace Mist {
void inputTS::seek(int seekTime) { void inputTS::seek(int seekTime) {
tsStream.clear(); tsStream.clear();
readPMT(); readPMT();
unsigned long seekPos = 0xFFFFFFFFull; uint64_t seekPos = 0xFFFFFFFFFFFFFFFFull;
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) { for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) {
unsigned long thisBPos = 0; unsigned long thisBPos = 0;
for (std::deque<DTSC::Key>::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++) { for (std::deque<DTSC::Key>::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++) {
@ -337,63 +348,125 @@ namespace Mist {
seekPos = thisBPos; seekPos = thisBPos;
} }
} }
fseek(inFile, seekPos, SEEK_SET);//seek to the correct position Util::fseek(inFile, seekPos, SEEK_SET);//seek to the correct position
} }
void inputTS::stream() { void inputTS::stream() {
IPC::semaphore pullLock;
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!pullLock){
FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str());
return;
}
if (!pullLock.tryWait()){
WARN_MSG("A pull process for stream %s is already running", streamName.c_str());
pullLock.close();
return;
}
const std::string & inpt = config->getString("input");
if (inpt.substr(0, 8) == "tsudp://"){
HTTP::URL input_url(inpt);
udpCon.setBlocking(false);
udpCon.bind(input_url.getPort(), input_url.host, input_url.path);
if (udpCon.getSock() == -1){
FAIL_MSG("Could not open UDP socket. Aborting.");
pullLock.post();
pullLock.close();
pullLock.unlink();
return;
}
}
IPC::sharedClient statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); IPC::sharedClient statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true);
uint64_t downCounter = 0; uint64_t downCounter = 0;
uint64_t startTime = Util::epoch(); uint64_t startTime = Util::epoch();
uint64_t noDataSince = Util::bootSecs(); uint64_t noDataSince = Util::bootSecs();
bool gettingData = false;
bool hasStarted = false; bool hasStarted = false;
cfgPointer = config; cfgPointer = config;
globalStreamName = streamName; globalStreamName = streamName;
unsigned long long threadCheckTimer = Util::bootSecs(); unsigned long long threadCheckTimer = Util::bootSecs();
while (config->is_active) { while (config->is_active) {
if (inFile) { if (tcpCon) {
if (feof(inFile)){ if (tcpCon.spool()){
config->is_active = false; while (tcpCon.Received().available(188)){
INFO_MSG("Reached end of file on streamed input"); while (tcpCon.Received().get()[0] != 0x47 && tcpCon.Received().available(188)){
tcpCon.Received().remove(1);
}
if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){
std::string newData = tcpCon.Received().remove(188);
tsBuf.FromPointer(newData.data());
liveStream.add(tsBuf);
if (!liveStream.isDataTrack(tsBuf.getPID())){
liveStream.parse(tsBuf.getPID());
}
}
}
noDataSince = Util::bootSecs();
}else{
Util::sleep(100);
} }
int ctr = 0; if (!tcpCon){
while (ctr < 20 && tsBuf.FromFile(inFile) && !feof(inFile)){ config->is_active = false;
liveStream.add(tsBuf); INFO_MSG("End of streamed input");
downCounter += 188;
ctr++;
} }
} else { } else {
std::string leftData; std::string leftData;
bool received = false;
while (udpCon.Receive()) { while (udpCon.Receive()) {
downCounter += udpCon.data_len;
received = true;
if (!gettingData){
gettingData = true;
INFO_MSG("Now receiving UDP data...");
}
int offset = 0; int offset = 0;
//Try to read full TS Packets //Try to read full TS Packets
//Watch out! We push here to a global, in order for threads to be able to access it. //Watch out! We push here to a global, in order for threads to be able to access it.
while (offset < udpCon.data_len) { while (offset < udpCon.data_len) {
if (udpCon.data[0] == 0x47){//check for sync byte if (udpCon.data[offset] == 0x47){//check for sync byte
if (offset + 188 <= udpCon.data_len){ if (offset + 188 <= udpCon.data_len){
liveStream.add(udpCon.data + offset); tsBuf.FromPointer(udpCon.data + offset);
noDataSince = Util::bootSecs(); liveStream.add(tsBuf);
downCounter += 188; if (!liveStream.isDataTrack(tsBuf.getPID())){
liveStream.parse(tsBuf.getPID());
}
leftData.clear();
}else{ }else{
leftData.append(udpCon.data + offset, udpCon.data_len - offset); leftData.append(udpCon.data + offset, udpCon.data_len - offset);
} }
offset += 188; offset += 188;
}else{ }else{
uint32_t maxBytes = std::min((uint32_t)(188 - leftData.size()), (uint32_t)(udpCon.data_len - offset));
uint32_t numBytes = maxBytes;
VERYHIGH_MSG("%lu bytes of non-sync-byte data received", numBytes);
if (leftData.size()){ if (leftData.size()){
leftData.append(udpCon.data + offset, 1); leftData.append(udpCon.data + offset, numBytes);
if (leftData.size() >= 188){ while (leftData.size() >= 188){
liveStream.add((char*)leftData.data()); VERYHIGH_MSG("Assembled scrap packet");
noDataSince = Util::bootSecs(); tsBuf.FromPointer((char*)leftData.data());
downCounter += 188; liveStream.add(tsBuf);
if (!liveStream.isDataTrack(tsBuf.getPID())){
liveStream.parse(tsBuf.getPID());
}
leftData.erase(0, 188); leftData.erase(0, 188);
} }
} }
++offset; offset += numBytes;
} }
} }
} }
if (!received){
Util::sleep(100);
}else{
noDataSince = Util::bootSecs();
}
}
if (gettingData && Util::bootSecs() - noDataSince > 1){
gettingData = false;
INFO_MSG("No longer receiving data.");
} }
//Check for and spawn threads here. //Check for and spawn threads here.
if (Util::bootSecs() - threadCheckTimer > 2) { if (Util::bootSecs() - threadCheckTimer > 1) {
//Connect to stats for INPUT detection //Connect to stats for INPUT detection
uint64_t now = Util::epoch(); uint64_t now = Util::epoch();
if (!statsPage.getData()){ if (!statsPage.getData()){
@ -402,6 +475,9 @@ namespace Mist {
if (statsPage.getData()){ if (statsPage.getData()){
if (!statsPage.isAlive()){ if (!statsPage.isAlive()){
config->is_active = false; config->is_active = false;
pullLock.post();
pullLock.close();
pullLock.unlink();
return; return;
} }
IPC::statExchange tmpEx(statsPage.getData()); IPC::statExchange tmpEx(statsPage.getData());
@ -410,59 +486,58 @@ namespace Mist {
tmpEx.streamName(streamName); tmpEx.streamName(streamName);
tmpEx.connector("INPUT"); tmpEx.connector("INPUT");
tmpEx.up(0); tmpEx.up(0);
tmpEx.down(downCounter); tmpEx.down(downCounter + tcpCon.dataDown());
tmpEx.time(now - startTime); tmpEx.time(now - startTime);
tmpEx.lastSecond(0); tmpEx.lastSecond(0);
statsPage.keepAlive(); statsPage.keepAlive();
} }
std::set<unsigned long> activeTracks = liveStream.getActiveTracks(); std::set<unsigned long> activeTracks = liveStream.getActiveTracks();
char semName[NAME_BUFFER_SIZE]; {
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); if (hasStarted && !threadTimer.size()){
lock.wait(); if (!isAlwaysOn()){
if (hasStarted && !threadTimer.size()){ INFO_MSG("Shutting down because no active threads and we had input in the past");
if (!isAlwaysOn()){ config->is_active = false;
INFO_MSG("Shutting down because no active threads and we had input in the past"); }else{
config->is_active = false; hasStarted = false;
}else{ }
hasStarted = false; }
for (std::set<unsigned long>::iterator it = activeTracks.begin(); it != activeTracks.end(); it++) {
if (!liveStream.isDataTrack(*it)){continue;}
if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))) {
WARN_MSG("Thread for track %d timed out %d seconds ago without a clean shutdown.", *it, Util::bootSecs() - threadTimer[*it]);
threadTimer.erase(*it);
}
if (!hasStarted){
hasStarted = true;
}
if (!threadTimer.count(*it)) {
//Add to list of unclaimed threads
claimableThreads.insert(*it);
//Spawn thread here.
tthread::thread thisThread(parseThread, 0);
thisThread.detach();
}
} }
} }
for (std::set<unsigned long>::iterator it = activeTracks.begin(); it != activeTracks.end(); it++) {
if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))) {
WARN_MSG("Thread for track %d timed out %d seconds ago without a clean shutdown.", *it, Util::bootSecs() - threadTimer[*it]);
threadTimer.erase(*it);
}
if (!hasStarted){
hasStarted = true;
}
if (!threadTimer.count(*it)) {
//Add to list of unclaimed threads
claimableThreads.insert(*it);
//Spawn thread here.
tthread::thread thisThread(parseThread, 0);
thisThread.detach();
}
}
lock.post();
threadCheckTimer = Util::bootSecs(); threadCheckTimer = Util::bootSecs();
} }
if (!inFile){ if (Util::bootSecs() - noDataSince > 20){
Util::sleep(100); if (!isAlwaysOn()){
if (Util::bootSecs() - noDataSince > 20){ WARN_MSG("No packets received for 20 seconds - terminating");
if (!isAlwaysOn()){ config->is_active = false;
WARN_MSG("No packets received for 20 seconds - terminating"); }else{
config->is_active = false; noDataSince = Util::bootSecs();
}else{
noDataSince = Util::bootSecs();
}
} }
} }
} }
finish(); finish();
pullLock.post();
pullLock.close();
pullLock.unlink();
INFO_MSG("Input for stream %s closing clean", streamName.c_str()); INFO_MSG("Input for stream %s closing clean", streamName.c_str());
} }
@ -471,16 +546,12 @@ namespace Mist {
Input::finish(); Input::finish();
return; return;
} }
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
int threadCount = 0; int threadCount = 0;
do { do {
lock.wait(); {
threadCount = threadTimer.size(); tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
lock.post(); threadCount = threadTimer.size();
}
if (threadCount){ if (threadCount){
Util::sleep(100); Util::sleep(100);
} }
@ -492,7 +563,7 @@ namespace Mist {
if (!standAlone){return false;} if (!standAlone){return false;}
//otherwise, check input param //otherwise, check input param
const std::string & inpt = config->getString("input"); const std::string & inpt = config->getString("input");
if (inpt.size() && inpt != "-" && inpt.substr(0,9) != "stream://" && inpt.substr(0,8) != "tsudp://" && inpt.substr(0, 8) != "ts-exec:"){ if (inpt.size() && inpt != "-" && inpt.substr(0,9) != "stream://" && inpt.substr(0,8) != "tsudp://" && inpt.substr(0, 8) != "ts-exec:" && inpt.substr(0, 7) != "http://" && inpt.substr(0, 10) != "http-ts://"){
return true; return true;
}else{ }else{
return false; return false;

View file

@ -29,7 +29,7 @@ namespace Mist {
FILE * inFile;///<The input file with ts data FILE * inFile;///<The input file with ts data
TS::Stream tsStream;///<Used for parsing the incoming ts stream TS::Stream tsStream;///<Used for parsing the incoming ts stream
Socket::UDPConnection udpCon; Socket::UDPConnection udpCon;
std::string udpDataBuffer; Socket::Connection tcpCon;
TS::Packet tsBuf; TS::Packet tsBuf;
pid_t inputProcess; pid_t inputProcess;
}; };