From 20b3010e75395f5c5507186b66248bccf92737dc Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 25 Jul 2017 11:52:35 +0200 Subject: [PATCH] Added http://*.ts and http-ts://* URL support to TS input, siginificant TS parsing/input speed upgrades, various other related fixes and sundry --- lib/ts_stream.cpp | 205 ++++++--------------- lib/ts_stream.h | 10 +- src/input/input_ts.cpp | 395 ++++++++++++++++++++++++----------------- src/input/input_ts.h | 2 +- 4 files changed, 297 insertions(+), 315 deletions(-) diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index a25257ad..2a59b4c1 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -10,7 +10,7 @@ namespace TS{ void ADTSRemainder::setRemainder(const aac::adts &p, const void *source, const uint32_t avail, - const uint32_t bPos){ + const uint64_t bPos){ if (!p.getCompleteSize()){return;} if (max < p.getCompleteSize()){ @@ -62,26 +62,16 @@ namespace TS{ uint32_t ADTSRemainder::getLength(){return len;} - uint32_t ADTSRemainder::getBpos(){return bpos;} + uint64_t ADTSRemainder::getBpos(){return bpos;} uint32_t ADTSRemainder::getTodo(){return len - now;} char *ADTSRemainder::getData(){return data;} Stream::Stream(bool _threaded){ threaded = _threaded; - if (threaded){ - globalSem.open("MstTSInputLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!globalSem){ - FAIL_MSG("Creating semaphore failed: %s", strerror(errno)); - threaded = false; - DEBUG_MSG(DLVL_FAIL, "Creating semaphore failed: %s", strerror(errno)); - return; - } - } } Stream::~Stream(){ - if (threaded){globalSem.unlink();} } void Stream::parse(char *newPack, unsigned long long bytePos){ @@ -91,17 +81,17 @@ namespace TS{ } void Stream::partialClear(){ - if (threaded){globalSem.wait();} + tthread::lock_guard guard(tMutex); pesStreams.clear(); pesPositions.clear(); outPackets.clear(); buildPacket.clear(); - if (threaded){globalSem.post();} + seenUnitStart.clear(); } void Stream::clear(){ + tthread::lock_guard guard(tMutex); partialClear(); - if (threaded){globalSem.wait();} pidToCodec.clear(); adtsInfo.clear(); spsInfo.clear(); @@ -115,10 +105,10 @@ namespace TS{ pmtTracks.clear(); remainders.clear(); associationTable = ProgramAssociationTable(); - if (threaded){globalSem.post();} } void Stream::finish(){ + tthread::lock_guard guard(tMutex); if (!pesStreams.size()){return;} for (std::map >::const_iterator i = pesStreams.begin(); @@ -134,53 +124,47 @@ namespace TS{ } void Stream::add(Packet &newPack, unsigned long long bytePos){ - if (threaded){globalSem.wait();} - + tthread::lock_guard guard(tMutex); int tid = newPack.getPID(); + bool unitStart = newPack.getUnitStart(); + std::deque & PS = pesStreams[tid]; if ((pidToCodec.count(tid) || tid == 0 || newPack.isPMT()) && - (pesStreams[tid].size() || newPack.getUnitStart())){ - pesStreams[tid].push_back(newPack); - pesPositions[tid].push_back(bytePos); + (unitStart || PS.size())){ + PS.push_back(newPack); + if (unitStart){ + pesPositions[tid].push_back(bytePos); + ++(seenUnitStart[tid]); + } } - - if (threaded){globalSem.post();} } bool Stream::isDataTrack(unsigned long tid){ if (tid == 0){return false;} - if (threaded){globalSem.wait();} - bool result = !pmtTracks.count(tid); - if (threaded){globalSem.post();} - return result; + { + tthread::lock_guard guard(tMutex); + return !pmtTracks.count(tid); + } } void Stream::parse(unsigned long tid){ - if (threaded){globalSem.wait();} + tthread::lock_guard guard(tMutex); if (!pesStreams.count(tid) || pesStreams[tid].size() == 0){ - if (threaded){globalSem.post();} return; } std::deque &trackPackets = pesStreams[tid]; - if (threaded){globalSem.post();} - // Handle PAT packets if (tid == 0){ ///\todo Keep track of updates in PAT instead of keeping only the last PAT as a reference - if (threaded){globalSem.wait();} associationTable = trackPackets.back(); associationTable.parsePIDs(); lastPAT = Util::bootSecs(); - if (threaded){globalSem.post();} int pmtCount = associationTable.getProgramCount(); for (int i = 0; i < pmtCount; i++){pmtTracks.insert(associationTable.getProgramPID(i));} - if (threaded){globalSem.wait();} pesStreams.erase(0); - pesPositions.erase(0); - if (threaded){globalSem.post();} return; } @@ -191,10 +175,8 @@ namespace TS{ if (pmtTracks.count(tid)){ ///\todo Keep track of updates in PMT instead of keeping only the last PMT per program as a /// reference - if (threaded){globalSem.wait();} mappingTable[tid] = trackPackets.back(); lastPMT[tid] = Util::bootSecs(); - if (threaded){globalSem.post();} ProgramMappingEntry entry = mappingTable[tid].getEntry(0); while (entry){ unsigned long pid = entry.getElementaryPid(); @@ -215,42 +197,13 @@ namespace TS{ entry.advance(); } - if (threaded){globalSem.wait();} pesStreams.erase(tid); - pesPositions.erase(tid); - if (threaded){globalSem.post();} - return; } - - - if (threaded){globalSem.wait();} - - bool parsePes = false; - - int packNum = 1; - // Usually we append a packet at a time, so the start code is expected to show up at the end. - std::deque &inStream = pesStreams[tid]; - - if(inStream.size() > 1) { - if (inStream.rbegin()->getUnitStart()){ - parsePes = true; - }else{ - // But, sometimes (e.g. live) we do multiples, and need to check all of it... - std::deque::iterator lastPack = inStream.end(); - std::deque::iterator curPack = inStream.begin(); - curPack++; - while (curPack != lastPack && !curPack->getUnitStart()){ - curPack++; - packNum++; - } - if (curPack != lastPack){parsePes = true;} - } + if(seenUnitStart[tid] > 1) { + parsePES(tid); } - if (threaded){globalSem.post();} - - if (parsePes){parsePES(tid);} } void Stream::parse(Packet &newPack, unsigned long long bytePos){ @@ -261,11 +214,8 @@ namespace TS{ } bool Stream::hasPacketOnEachTrack() const{ - if (threaded){globalSem.wait();} + tthread::lock_guard guard(tMutex); if (!pidToCodec.size()){ - - if (threaded){globalSem.post();} - // INFO_MSG("no packet on each track 1, pidtocodec.size: %d, outpacket.size: %d", // pidToCodec.size(), outPackets.size()); return false; @@ -287,42 +237,27 @@ namespace TS{ } } - if (threaded){globalSem.post();} - return (!missing || (missing != pidToCodec.size() && lastTime - firstTime > 2000)); } bool Stream::hasPacket(unsigned long tid) const{ - if (threaded){globalSem.wait();} + tthread::lock_guard guard(tMutex); std::map >::const_iterator pesIt = pesStreams.find(tid); if (pesIt == pesStreams.end()){ - if (threaded){globalSem.post();} return false; } if (outPackets.count(tid) && outPackets.at(tid).size()){ - if (threaded){globalSem.post();} return true; } - const std::deque & thisStream = pesIt->second; - std::deque::const_iterator curPack = thisStream.begin(); - std::deque::const_iterator endPack = thisStream.end(); - - if (curPack != endPack){curPack++;} - - while (curPack != endPack && !curPack->getUnitStart()){curPack++;} - - if (curPack != endPack){ - if (threaded){globalSem.post();} + if (pidToCodec.count(tid) && seenUnitStart.count(tid) && seenUnitStart.at(tid) > 1){ return true; } - if (threaded){globalSem.post();} return false; } bool Stream::hasPacket() const{ - if (threaded){globalSem.wait();} + tthread::lock_guard guard(tMutex); if (!pesStreams.size()){ - if (threaded){globalSem.post();} return false; } @@ -331,27 +266,18 @@ namespace TS{ outPackets.begin(); i != outPackets.end(); i++){ if (i->second.size()){ - if (threaded){globalSem.post();} return true; } } } - for (std::map >::const_iterator i = pesStreams.begin(); - i != pesStreams.end(); i++){ - std::deque::const_iterator curPack = i->second.begin(); - - if (curPack != i->second.end()){curPack++;} - - while (curPack != i->second.end() && !curPack->getUnitStart()){curPack++;} - - if (curPack != i->second.end()){ - if (threaded){globalSem.post();} + for (std::map::const_iterator i = seenUnitStart.begin(); + i != seenUnitStart.end(); i++){ + if (pidToCodec.count(i->first) && i->second > 1){ return true; } } - if (threaded){globalSem.post();} return false; } @@ -370,11 +296,9 @@ namespace TS{ if (!pidToCodec.count(tid)){ return; // skip unknown codecs } - if (threaded){globalSem.wait();} std::deque &inStream = pesStreams[tid]; - std::deque &inPositions = pesPositions[tid]; if (inStream.size() <= 1){ - if (threaded){globalSem.post();} + INFO_MSG("No PES packets to parse"); return; } // Find number of packets before unit Start @@ -382,10 +306,10 @@ namespace TS{ std::deque::iterator curPack = inStream.begin(); - if (inStream.rbegin()->getUnitStart()){ + if (seenUnitStart[tid] == 2 && inStream.begin()->getUnitStart() && inStream.rbegin()->getUnitStart()){ packNum = inStream.size() - 1; curPack = inStream.end(); - curPack --; + curPack--; }else{ curPack++; while (curPack != inStream.end() && !curPack->getUnitStart()){ @@ -394,18 +318,28 @@ namespace TS{ } } if (!finished && curPack == inStream.end()){ - if (threaded){globalSem.post();} + INFO_MSG("No PES packets to parse (%lu)", seenUnitStart[tid]); return; } - - unsigned long long bPos = inPositions.front(); + + // We now know we're deleting 1 UnitStart, so we can pop the pesPositions and lower the seenUnitStart counter. + --(seenUnitStart[tid]); + std::deque &inPositions = pesPositions[tid]; + uint64_t bPos = inPositions.front(); + inPositions.pop_front(); // Create a buffer for the current PES, and remove it from the pesStreams buffer. int paySize = 0; // Loop over the packets we need, and calculate the total payload size curPack = inStream.begin(); + int lastCtr = curPack->getContinuityCounter() - 1; for (int i = 0; i < packNum; i++){ + if (curPack->getContinuityCounter() == lastCtr){ + curPack++; + continue; + } + lastCtr = curPack->getContinuityCounter(); paySize += curPack->getPayloadLength(); curPack++; } @@ -420,14 +354,14 @@ namespace TS{ paySize = 0; curPack = inStream.begin(); - int lastCtr = curPack->getContinuityCounter() - 1; + lastCtr = curPack->getContinuityCounter() - 1; for (int i = 0; i < packNum; i++){ if (curPack->getContinuityCounter() == lastCtr){ curPack++; continue; } if (curPack->getContinuityCounter() - lastCtr != 1 && curPack->getContinuityCounter()){ - INFO_MSG("Parsing a pes on track %d, missed %d packets", tid, + INFO_MSG("Parsing PES on track %d, missed %d packets", tid, curPack->getContinuityCounter() - lastCtr - 1); } lastCtr = curPack->getContinuityCounter(); @@ -436,8 +370,6 @@ namespace TS{ curPack++; } inStream.erase(inStream.begin(), curPack); - inPositions.erase(inPositions.begin(), inPositions.begin() + packNum); - if (threaded){globalSem.post();} // we now have the whole PES packet in char* payload, with a total size of paySize (including // headers) @@ -511,7 +443,7 @@ namespace TS{ } if (paySize - offset - pesOffset < realPayloadSize){ - WARN_MSG("Packet loss detected, glitches will occur"); + WARN_MSG("Packet loss detected (%lu != %lu), glitches will occur", (uint32_t)(paySize-offset-pesOffset), (uint32_t)realPayloadSize); realPayloadSize = paySize - offset - pesOffset; } @@ -536,7 +468,6 @@ namespace TS{ // Parse all the ADTS packets unsigned long offsetInPes = 0; uint64_t msRead = 0; - if (threaded){globalSem.wait();} if (remainders.count(tid) && remainders[tid].getLength()){ offsetInPes = @@ -584,15 +515,12 @@ namespace TS{ } } } - if (threaded){globalSem.post();} } if (thisCodec == ID3 || thisCodec == AC3){ - if (threaded){globalSem.wait();} out.push_back(DTSC::Packet()); out.back().genericFill(timeStamp, timeOffset, tid, pesPayload, realPayloadSize, bPos, 0); - if (threaded){globalSem.post();} } if (thisCodec == H264 || thisCodec == H265){ @@ -616,9 +544,7 @@ namespace TS{ // fill a new one. if (buildPacket[tid].getTime() != timeStamp){ // Add the finished DTSC packet to our output buffer - if (threaded){globalSem.wait();} out.push_back(buildPacket[tid]); - if (threaded){globalSem.post();} uint32_t size; char * tmp ; @@ -626,7 +552,6 @@ namespace TS{ INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime()) - if (threaded){globalSem.post();} // Create a new empty packet with the key frame bit set to true buildPacket[tid].null(); buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true); @@ -676,7 +601,6 @@ namespace TS{ // fill a new one. if (buildPacket[tid].getTime() != timeStamp){ // Add the finished DTSC packet to our output buffer - if (threaded){globalSem.wait();} out.push_back(buildPacket[tid]); uint32_t size; @@ -684,7 +608,6 @@ namespace TS{ buildPacket[tid].getString("data", tmp, size); // INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime()) - if (threaded){globalSem.post();} // Create a new empty packet with the key frame bit set to true buildPacket[tid].null(); buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true); @@ -705,34 +628,30 @@ namespace TS{ } void Stream::getPacket(unsigned long tid, DTSC::Packet &pack){ + tthread::lock_guard guard(tMutex); pack.null(); if (!hasPacket(tid)){ ERROR_MSG("Trying to obtain a packet on track %lu, but no full packet is available", tid); return; } - if (threaded){globalSem.wait();} bool packetReady = outPackets.count(tid) && outPackets[tid].size(); - if (threaded){globalSem.post();} - if (!packetReady){parse(tid);} - - if (threaded){globalSem.wait();} - packetReady = outPackets.count(tid) && outPackets[tid].size(); - if (threaded){globalSem.post();} + if (!packetReady){ + parse(tid); + packetReady = outPackets.count(tid) && outPackets[tid].size(); + } if (!packetReady){ ERROR_MSG("Track %lu: PES without valid packets?", tid); return; } - if (threaded){globalSem.wait();} pack = outPackets[tid].front(); outPackets[tid].pop_front(); if (!outPackets[tid].size()){outPackets.erase(tid);} - if (threaded){globalSem.post();} } void Stream::parseNal(uint32_t tid, const char *pesPayload, const char *nextPtr, @@ -773,15 +692,11 @@ namespace TS{ break; } case 0x07:{ - if (threaded){globalSem.wait();} spsInfo[tid] = std::string(pesPayload, (nextPtr - pesPayload)); - if (threaded){globalSem.post();} break; } case 0x08:{ - if (threaded){globalSem.wait();} ppsInfo[tid] = std::string(pesPayload, (nextPtr - pesPayload)); - if (threaded){globalSem.post();} break; } default: break; @@ -808,9 +723,7 @@ namespace TS{ case 32: case 33: case 34:{ - if (threaded){globalSem.wait();} hevcInfo[tid].addUnit((char *)pesPayload); // may i convert to (char *)? - if (threaded){globalSem.post();} break; } default: break; @@ -819,7 +732,7 @@ namespace TS{ } void Stream::getEarliestPacket(DTSC::Packet &pack){ - if (threaded){globalSem.wait();} + tthread::lock_guard guard(tMutex); pack.null(); unsigned long packTime = 0xFFFFFFFFull; @@ -832,13 +745,12 @@ namespace TS{ packTime = it->second.front().getTime(); } } - if (threaded){globalSem.post();} if (packTrack){getPacket(packTrack, pack);} } void Stream::initializeMetadata(DTSC::Meta &meta, unsigned long tid, unsigned long mappingId){ - if (threaded){globalSem.wait();} + tthread::lock_guard guard(tMutex); unsigned long mId = mappingId; @@ -944,11 +856,10 @@ namespace TS{ MEDIUM_MSG("Initialized track %lu as %s %s", it->first, meta.tracks[mId].codec.c_str(), meta.tracks[mId].type.c_str()); } - if (threaded){globalSem.post();} } std::set Stream::getActiveTracks(){ - if (threaded){globalSem.wait();} + tthread::lock_guard guard(tMutex); std::set result; // Track 0 is always active result.insert(0); @@ -978,16 +889,14 @@ namespace TS{ } } } - if (threaded){globalSem.post();} return result; } void Stream::eraseTrack(unsigned long tid){ - if (threaded){globalSem.wait();} + tthread::lock_guard guard(tMutex); pesStreams.erase(tid); pesPositions.erase(tid); outPackets.erase(tid); - if (threaded){globalSem.post();} } } diff --git a/lib/ts_stream.h b/lib/ts_stream.h index 21237069..0620206c 100644 --- a/lib/ts_stream.h +++ b/lib/ts_stream.h @@ -1,6 +1,7 @@ #include "adts.h" #include "h265.h" #include "ts_packet.h" +#include "tinythread.h" #include #include #include @@ -16,16 +17,16 @@ namespace TS{ uint32_t max; uint32_t now; uint32_t len; - uint32_t bpos; + uint64_t bpos; public: void setRemainder(const aac::adts &p, const void *source, const uint32_t avail, - const uint32_t bPos); + const uint64_t bPos); ADTSRemainder(); ~ADTSRemainder(); uint32_t getLength(); - uint32_t getBpos(); + uint64_t getBpos(); uint32_t getTodo(); char *getData(); @@ -78,7 +79,8 @@ namespace TS{ std::map hevcInfo; std::map metaInit; std::map descriptors; - mutable IPC::semaphore globalSem; + std::map seenUnitStart; + mutable tthread::recursive_mutex tMutex; bool threaded; diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 4e2cbf1e..c6c0f756 100755 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -13,14 +14,14 @@ #include #include #include +#include #include "input_ts.h" #include #include #include -#define SEM_TS_CLAIM "/MstTSIN%s" - +tthread::mutex threadClaimMutex; std::string globalStreamName; TS::Stream liveStream(true); Util::Config * cfgPointer = NULL; @@ -32,24 +33,14 @@ std::set claimableThreads; void parseThread(void * ignored) { - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); - IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - int tid = -1; - lock.wait(); - if (claimableThreads.size()) { - tid = *claimableThreads.begin(); - claimableThreads.erase(claimableThreads.begin()); - } - lock.post(); - if (tid == -1) { - return; - } - - if (liveStream.isDataTrack(tid)){ - if (!Util::startInput(globalStreamName, "push://INTERNAL_ONLY:"+cfgPointer->getString("input"), true, true)) {//manually override stream url to start the buffer - FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str()); + { + tthread::lock_guard guard(threadClaimMutex); + if (claimableThreads.size()) { + tid = *claimableThreads.begin(); + claimableThreads.erase(claimableThreads.begin()); + } + if (tid == -1) { return; } } @@ -59,17 +50,28 @@ void parseThread(void * ignored) { DTSC::Meta myMeta; if (liveStream.isDataTrack(tid)){ + if (!Util::streamAlive(globalStreamName) && !Util::startInput(globalStreamName, "push://INTERNAL_ONLY:"+cfgPointer->getString("input"), true, true)) { + FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str()); + return; + } + char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, globalStreamName.c_str()); myProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); myProxy.userClient.countAsViewer = false; } - threadTimer[tid] = Util::bootSecs(); while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active && (!liveStream.isDataTrack(tid) || myProxy.userClient.isAlive())) { liveStream.parse(tid); - if (liveStream.hasPacket(tid)){ + if (!liveStream.hasPacket(tid)){ + if (liveStream.isDataTrack(tid)){ + myProxy.userClient.keepAlive(); + } + Util::sleep(100); + continue; + } + while (liveStream.hasPacket(tid)){ liveStream.initializeMetadata(myMeta, tid); DTSC::Packet pack; liveStream.getPacket(tid, pack); @@ -77,19 +79,12 @@ void parseThread(void * ignored) { myProxy.continueNegotiate(tid, myMeta, true); myProxy.bufferLivePacket(pack, myMeta); } - - lock.wait(); - threadTimer[tid] = Util::bootSecs(); - lock.post(); } - if (!liveStream.hasPacket(tid)){ - if (liveStream.isDataTrack(tid)){ - myProxy.userClient.keepAlive(); - } - Util::sleep(100); + { + tthread::lock_guard guard(threadClaimMutex); + threadTimer[tid] = Util::bootSecs(); } } - lock.wait(); std::string reason = "unknown reason"; if (!(Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT)){reason = "thread timeout";} if (!cfgPointer->is_active){reason = "input shutting down";} @@ -97,9 +92,11 @@ void parseThread(void * ignored) { reason = "buffer disconnect"; cfgPointer->is_active = false; } - INFO_MSG("Shutting down thread because %s", reason.c_str()); - threadTimer.erase(tid); - lock.post(); + INFO_MSG("Shutting down thread for %d because %s", tid, reason.c_str()); + { + tthread::lock_guard guard(threadClaimMutex); + threadTimer.erase(tid); + } liveStream.eraseTrack(tid); myProxy.userClient.finish(); } @@ -115,10 +112,14 @@ namespace Mist { capa["source_match"].append("stream://*.ts"); capa["source_match"].append("tsudp://*"); capa["source_match"].append("ts-exec:*"); + capa["source_match"].append("http://*.ts"); + capa["source_match"].append("http-ts://*"); //These can/may be set to always-on mode capa["always_match"].append("stream://*.ts"); capa["always_match"].append("tsudp://*"); capa["always_match"].append("ts-exec:*"); + capa["always_match"].append("http://*.ts"); + capa["always_match"].append("http-ts://*"); capa["priority"] = 9ll; capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("HEVC"); @@ -132,15 +133,13 @@ namespace Mist { if (inFile) { fclose(inFile); } + if (tcpCon){ + tcpCon.close(); + } if (!standAlone){ - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); - IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - lock.wait(); + tthread::lock_guard guard(threadClaimMutex); threadTimer.clear(); claimableThreads.clear(); - lock.post(); - lock.unlink(); } } @@ -148,51 +147,62 @@ namespace Mist { bool inputTS::preRun() { const std::string & inpt = config->getString("input"); //streamed standard input - if (inpt == "-" || inpt.substr(0, 8) == "ts-exec:") { + if (inpt == "-") { standAlone = false; - if (inpt.size() > 1){ - std::string input = inpt.substr(8); - char *args[128]; - uint8_t argCnt = 0; - char *startCh = 0; - for (char *i = (char*)input.c_str(); i <= input.data() + input.size(); ++i){ - if (!*i){ - if (startCh){args[argCnt++] = startCh;} - break; - } - if (*i == ' '){ - if (startCh){ - args[argCnt++] = startCh; - startCh = 0; - *i = 0; - } - }else{ - if (!startCh){startCh = i;} - } - } - args[argCnt] = 0; - - int fin = -1, fout = -1, ferr = -1; - inputProcess = Util::Procs::StartPiped(args, &fin, &fout, &ferr); - inFile = fdopen(fout, "r"); - }else{ - inFile = stdin; + tcpCon = Socket::Connection(fileno(stdout), fileno(stdin)); + return true; + } + if (inpt.substr(0, 7) == "http://" || inpt.substr(0, 10) == "http-ts://"){ + standAlone = false; + HTTP::URL url(inpt); + url.protocol = "http"; + HTTP::Downloader DL; + DL.getHTTP().headerOnly = true; + if (!DL.get(url)){ + return false; } + tcpCon = DL.getSocket(); + return true; + } + if (inpt.substr(0, 8) == "ts-exec:") { + standAlone = false; + std::string input = inpt.substr(8); + char *args[128]; + uint8_t argCnt = 0; + char *startCh = 0; + for (char *i = (char*)input.c_str(); i <= input.data() + input.size(); ++i){ + if (!*i){ + if (startCh){args[argCnt++] = startCh;} + break; + } + if (*i == ' '){ + if (startCh){ + args[argCnt++] = startCh; + startCh = 0; + *i = 0; + } + }else{ + if (!startCh){startCh = i;} + } + } + args[argCnt] = 0; + + int fin = -1, fout = -1, ferr = -1; + inputProcess = Util::Procs::StartPiped(args, &fin, &fout, &ferr); + tcpCon = Socket::Connection(-1, fout); return true; } //streamed file if (inpt.substr(0,9) == "stream://"){ inFile = fopen(inpt.c_str()+9, "r"); + tcpCon = Socket::Connection(-1, fileno(inFile)); standAlone = false; return inFile; } //UDP input (tsudp://[host:]port[/iface[,iface[,...]]]) if (inpt.substr(0, 8) == "tsudp://"){ - HTTP::URL input_url(inpt); standAlone = false; - udpCon.setBlocking(false); - udpCon.bind(input_url.getPort(), input_url.host, input_url.path); - return udpCon.getSock() != -1; + return true; } //plain VoD file inFile = fopen(inpt.c_str(), "r"); @@ -232,33 +242,33 @@ namespace Mist { bool inputTS::readHeader() { if (!inFile){return false;} TS::Packet packet;//to analyse and extract data + DTSC::Packet headerPack; fseek(inFile, 0, SEEK_SET);//seek to beginning - long long int lastBpos = 0; + uint64_t lastBpos = 0; while (packet.FromFile(inFile) && !feof(inFile)) { tsStream.parse(packet, lastBpos); - lastBpos = ftell(inFile); - while (tsStream.hasPacketOnEachTrack()) { - DTSC::Packet headerPack; - tsStream.getEarliestPacket(headerPack); - if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) { - tsStream.initializeMetadata(myMeta, headerPack.getTrackId()); + lastBpos = Util::ftell(inFile); + if (packet.getUnitStart()){ + while (tsStream.hasPacketOnEachTrack()) { + tsStream.getEarliestPacket(headerPack); + if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) { + tsStream.initializeMetadata(myMeta, headerPack.getTrackId()); + } + myMeta.update(headerPack); } - myMeta.update(headerPack); } } - - DTSC::Packet headerPack; - tsStream.getEarliestPacket(headerPack); - - while (headerPack) { + tsStream.finish(); + INFO_MSG("Reached %s at %llu bytes", feof(inFile)?"EOF":"error", lastBpos); + while (tsStream.hasPacket()) { + tsStream.getEarliestPacket(headerPack); if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) { tsStream.initializeMetadata(myMeta, headerPack.getTrackId()); } myMeta.update(headerPack); - tsStream.getEarliestPacket(headerPack); } - + fseek(inFile, 0, SEEK_SET); myMeta.toFile(config->getString("input") + ".dtsh"); return true; @@ -273,12 +283,13 @@ namespace Mist { thisPacket.null(); bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); while (!hasPacket && !feof(inFile) && (inputProcess == 0 || Util::Procs::childRunning(inputProcess)) && config->is_active) { - unsigned int bPos = ftell(inFile); tsBuf.FromFile(inFile); if (selectedTracks.count(tsBuf.getPID())) { - tsStream.parse(tsBuf, bPos); + tsStream.parse(tsBuf, 0);//bPos == 0 + if (tsBuf.getUnitStart()){ + hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); + } } - hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); } if (!hasPacket) { return; @@ -300,7 +311,7 @@ namespace Mist { void inputTS::readPMT() { //save current file position - int bpos = ftell(inFile); + uint64_t bpos = Util::ftell(inFile); if (fseek(inFile, 0, SEEK_SET)) { FAIL_MSG("Seek to 0 failed"); return; @@ -315,7 +326,7 @@ namespace Mist { tsStream.partialClear(); //Restore original file position - if (fseek(inFile, bpos, SEEK_SET)) { + if (Util::fseek(inFile, bpos, SEEK_SET)) { return; } } @@ -324,7 +335,7 @@ namespace Mist { void inputTS::seek(int seekTime) { tsStream.clear(); readPMT(); - unsigned long seekPos = 0xFFFFFFFFull; + uint64_t seekPos = 0xFFFFFFFFFFFFFFFFull; for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) { unsigned long thisBPos = 0; for (std::deque::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++) { @@ -337,63 +348,125 @@ namespace Mist { seekPos = thisBPos; } } - fseek(inFile, seekPos, SEEK_SET);//seek to the correct position + Util::fseek(inFile, seekPos, SEEK_SET);//seek to the correct position } void inputTS::stream() { + IPC::semaphore pullLock; + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!pullLock){ + FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); + return; + } + if (!pullLock.tryWait()){ + WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); + pullLock.close(); + return; + } + const std::string & inpt = config->getString("input"); + if (inpt.substr(0, 8) == "tsudp://"){ + HTTP::URL input_url(inpt); + udpCon.setBlocking(false); + udpCon.bind(input_url.getPort(), input_url.host, input_url.path); + if (udpCon.getSock() == -1){ + FAIL_MSG("Could not open UDP socket. Aborting."); + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + return; + } + } IPC::sharedClient statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); uint64_t downCounter = 0; uint64_t startTime = Util::epoch(); uint64_t noDataSince = Util::bootSecs(); + bool gettingData = false; bool hasStarted = false; cfgPointer = config; globalStreamName = streamName; unsigned long long threadCheckTimer = Util::bootSecs(); while (config->is_active) { - if (inFile) { - if (feof(inFile)){ - config->is_active = false; - INFO_MSG("Reached end of file on streamed input"); + if (tcpCon) { + if (tcpCon.spool()){ + while (tcpCon.Received().available(188)){ + while (tcpCon.Received().get()[0] != 0x47 && tcpCon.Received().available(188)){ + tcpCon.Received().remove(1); + } + if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){ + std::string newData = tcpCon.Received().remove(188); + tsBuf.FromPointer(newData.data()); + liveStream.add(tsBuf); + if (!liveStream.isDataTrack(tsBuf.getPID())){ + liveStream.parse(tsBuf.getPID()); + } + } + } + noDataSince = Util::bootSecs(); + }else{ + Util::sleep(100); } - int ctr = 0; - while (ctr < 20 && tsBuf.FromFile(inFile) && !feof(inFile)){ - liveStream.add(tsBuf); - downCounter += 188; - ctr++; + if (!tcpCon){ + config->is_active = false; + INFO_MSG("End of streamed input"); } } else { std::string leftData; + bool received = false; while (udpCon.Receive()) { + downCounter += udpCon.data_len; + received = true; + if (!gettingData){ + gettingData = true; + INFO_MSG("Now receiving UDP data..."); + } int offset = 0; //Try to read full TS Packets //Watch out! We push here to a global, in order for threads to be able to access it. while (offset < udpCon.data_len) { - if (udpCon.data[0] == 0x47){//check for sync byte + if (udpCon.data[offset] == 0x47){//check for sync byte if (offset + 188 <= udpCon.data_len){ - liveStream.add(udpCon.data + offset); - noDataSince = Util::bootSecs(); - downCounter += 188; + tsBuf.FromPointer(udpCon.data + offset); + liveStream.add(tsBuf); + if (!liveStream.isDataTrack(tsBuf.getPID())){ + liveStream.parse(tsBuf.getPID()); + } + leftData.clear(); }else{ leftData.append(udpCon.data + offset, udpCon.data_len - offset); } offset += 188; }else{ + uint32_t maxBytes = std::min((uint32_t)(188 - leftData.size()), (uint32_t)(udpCon.data_len - offset)); + uint32_t numBytes = maxBytes; + VERYHIGH_MSG("%lu bytes of non-sync-byte data received", numBytes); if (leftData.size()){ - leftData.append(udpCon.data + offset, 1); - if (leftData.size() >= 188){ - liveStream.add((char*)leftData.data()); - noDataSince = Util::bootSecs(); - downCounter += 188; + leftData.append(udpCon.data + offset, numBytes); + while (leftData.size() >= 188){ + VERYHIGH_MSG("Assembled scrap packet"); + tsBuf.FromPointer((char*)leftData.data()); + liveStream.add(tsBuf); + if (!liveStream.isDataTrack(tsBuf.getPID())){ + liveStream.parse(tsBuf.getPID()); + } leftData.erase(0, 188); } } - ++offset; + offset += numBytes; } } } + if (!received){ + Util::sleep(100); + }else{ + noDataSince = Util::bootSecs(); + } + } + if (gettingData && Util::bootSecs() - noDataSince > 1){ + gettingData = false; + INFO_MSG("No longer receiving data."); } //Check for and spawn threads here. - if (Util::bootSecs() - threadCheckTimer > 2) { + if (Util::bootSecs() - threadCheckTimer > 1) { //Connect to stats for INPUT detection uint64_t now = Util::epoch(); if (!statsPage.getData()){ @@ -402,6 +475,9 @@ namespace Mist { if (statsPage.getData()){ if (!statsPage.isAlive()){ config->is_active = false; + pullLock.post(); + pullLock.close(); + pullLock.unlink(); return; } IPC::statExchange tmpEx(statsPage.getData()); @@ -410,59 +486,58 @@ namespace Mist { tmpEx.streamName(streamName); tmpEx.connector("INPUT"); tmpEx.up(0); - tmpEx.down(downCounter); + tmpEx.down(downCounter + tcpCon.dataDown()); tmpEx.time(now - startTime); tmpEx.lastSecond(0); statsPage.keepAlive(); } std::set activeTracks = liveStream.getActiveTracks(); - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); - IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - lock.wait(); - if (hasStarted && !threadTimer.size()){ - if (!isAlwaysOn()){ - INFO_MSG("Shutting down because no active threads and we had input in the past"); - config->is_active = false; - }else{ - hasStarted = false; + { + tthread::lock_guard guard(threadClaimMutex); + if (hasStarted && !threadTimer.size()){ + if (!isAlwaysOn()){ + INFO_MSG("Shutting down because no active threads and we had input in the past"); + config->is_active = false; + }else{ + hasStarted = false; + } + } + for (std::set::iterator it = activeTracks.begin(); it != activeTracks.end(); it++) { + if (!liveStream.isDataTrack(*it)){continue;} + if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))) { + WARN_MSG("Thread for track %d timed out %d seconds ago without a clean shutdown.", *it, Util::bootSecs() - threadTimer[*it]); + threadTimer.erase(*it); + } + if (!hasStarted){ + hasStarted = true; + } + if (!threadTimer.count(*it)) { + + //Add to list of unclaimed threads + claimableThreads.insert(*it); + + //Spawn thread here. + tthread::thread thisThread(parseThread, 0); + thisThread.detach(); + } } } - for (std::set::iterator it = activeTracks.begin(); it != activeTracks.end(); it++) { - if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))) { - WARN_MSG("Thread for track %d timed out %d seconds ago without a clean shutdown.", *it, Util::bootSecs() - threadTimer[*it]); - threadTimer.erase(*it); - } - if (!hasStarted){ - hasStarted = true; - } - if (!threadTimer.count(*it)) { - - //Add to list of unclaimed threads - claimableThreads.insert(*it); - - //Spawn thread here. - tthread::thread thisThread(parseThread, 0); - thisThread.detach(); - } - } - lock.post(); threadCheckTimer = Util::bootSecs(); } - if (!inFile){ - Util::sleep(100); - if (Util::bootSecs() - noDataSince > 20){ - if (!isAlwaysOn()){ - WARN_MSG("No packets received for 20 seconds - terminating"); - config->is_active = false; - }else{ - noDataSince = Util::bootSecs(); - } + if (Util::bootSecs() - noDataSince > 20){ + if (!isAlwaysOn()){ + WARN_MSG("No packets received for 20 seconds - terminating"); + config->is_active = false; + }else{ + noDataSince = Util::bootSecs(); } } } finish(); + pullLock.post(); + pullLock.close(); + pullLock.unlink(); INFO_MSG("Input for stream %s closing clean", streamName.c_str()); } @@ -471,16 +546,12 @@ namespace Mist { Input::finish(); return; } - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); - IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - - int threadCount = 0; do { - lock.wait(); - threadCount = threadTimer.size(); - lock.post(); + { + tthread::lock_guard guard(threadClaimMutex); + threadCount = threadTimer.size(); + } if (threadCount){ Util::sleep(100); } @@ -492,7 +563,7 @@ namespace Mist { if (!standAlone){return false;} //otherwise, check input param const std::string & inpt = config->getString("input"); - if (inpt.size() && inpt != "-" && inpt.substr(0,9) != "stream://" && inpt.substr(0,8) != "tsudp://" && inpt.substr(0, 8) != "ts-exec:"){ + if (inpt.size() && inpt != "-" && inpt.substr(0,9) != "stream://" && inpt.substr(0,8) != "tsudp://" && inpt.substr(0, 8) != "ts-exec:" && inpt.substr(0, 7) != "http://" && inpt.substr(0, 10) != "http-ts://"){ return true; }else{ return false; diff --git a/src/input/input_ts.h b/src/input/input_ts.h index e9ab3237..8d543ee7 100755 --- a/src/input/input_ts.h +++ b/src/input/input_ts.h @@ -29,7 +29,7 @@ namespace Mist { FILE * inFile;///