diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index c1eec5bd..506c0613 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -317,6 +317,7 @@ namespace TS{ } if (psCache->size() <= 1){ if (!finished){FAIL_MSG("No PES packets to parse");} + seenUnitStart[tid] = 0; return; } // Find number of packets before unit Start @@ -336,6 +337,7 @@ namespace TS{ } if (!finished && curPack == psCache->end()){ FAIL_MSG("No PES packets to parse (%" PRIu32 ")", seenUnitStart[tid]); + seenUnitStart[tid] = 0; return; } diff --git a/src/input/input.cpp b/src/input/input.cpp index 15ef2e01..b08bbe7f 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -37,14 +37,11 @@ namespace Mist{ void Input::reloadClientMeta(){ if (M.getStreamName() != "" && M.getMaster()){return;} - if (M.getStreamName() != streamName){ - meta.reInit(streamName, false); - }else{ - meta.refresh(); - } + meta.reInit(streamName, false); } bool Input::hasMeta() const{return M && M.getStreamName() != "" && M.getValidTracks().size();} + bool Input::trackLoaded(size_t idx) const{return (M && M.trackLoaded(idx));} Input::Input(Util::Config *cfg) : InOutBase(){ config = cfg; diff --git a/src/input/input.h b/src/input/input.h index 500eab6b..a2ba4258 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -31,6 +31,7 @@ namespace Mist{ bool keepAlive(); void reloadClientMeta(); bool hasMeta() const; + bool trackLoaded(size_t idx) const; static Util::Config *config; virtual bool needsLock(){return !config->getBool("realtime");} virtual bool publishesTracks(){return true;} diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 4ba2a703..9def0470 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -31,7 +31,11 @@ std::map threadTimer; std::set claimableThreads; +/// Global, so that all tracks stay in sync +int64_t timeStampOffset = 0; + void parseThread(void *mistIn){ + uint64_t lastTimeStamp = 0; Mist::inputTS *input = reinterpret_cast(mistIn); size_t tid = 0; @@ -46,76 +50,87 @@ void parseThread(void *mistIn){ Comms::Users userConn; DTSC::Meta meta; - + DTSC::Packet pack; bool dataTrack = liveStream.isDataTrack(tid); - - if (dataTrack){ - if (!Util::streamAlive(globalStreamName) && - !Util::startInput(globalStreamName, "push://INTERNAL_ONLY:" + cfgPointer->getString("input"), true, true)){ - FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str()); - return; - } - - { - tthread::lock_guard guard(threadClaimMutex); - if (!input->hasMeta()){input->reloadClientMeta();} - } - meta.reInit(globalStreamName, false); + size_t idx = INVALID_TRACK_ID; + { + tthread::lock_guard guard(threadClaimMutex); + threadTimer[tid] = Util::bootSecs(); } - - size_t idx = meta.trackIDToIndex(tid, getpid()); - - threadTimer[tid] = Util::bootSecs(); while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active && - (!liveStream.isDataTrack(tid) || (userConn ? userConn.isAlive() : true))){ - { - tthread::lock_guard guard(threadClaimMutex); - threadTimer[tid] = Util::bootSecs(); - } - if (liveStream.isDataTrack(tid)){userConn.keepAlive();} + (!dataTrack || (userConn ? userConn.isAlive() : true))){ + if (dataTrack){userConn.keepAlive();} liveStream.parse(tid); if (!liveStream.hasPacket(tid)){ Util::sleep(100); continue; } - uint64_t startSecs = Util::bootSecs(); - while (liveStream.hasPacket(tid) && - ((Util::bootSecs() < startSecs + 2) && cfgPointer->is_active && - (!liveStream.isDataTrack(tid) || (userConn ? userConn.isAlive() : true)))){ - liveStream.parse(tid); - if (liveStream.hasPacket(tid)){ - if (idx == INVALID_TRACK_ID){ - tthread::lock_guard guard(threadClaimMutex); - liveStream.initializeMetadata(meta, tid); - idx = meta.trackIDToIndex(tid, getpid()); - if (idx != INVALID_TRACK_ID){ - userConn.reload(globalStreamName, idx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); - input->reloadClientMeta(); - } - } - if (idx == INVALID_TRACK_ID || !meta.trackValid(idx)){continue;} - if (!meta.trackLoaded(idx)){meta.refresh();} - DTSC::Packet pack; - liveStream.getPacket(tid, pack); - if (pack){ - tthread::lock_guard guard(threadClaimMutex); - if (!input->hasMeta()){input->reloadClientMeta();} - if (dataTrack){ - char *data; - size_t dataLen; - pack.getString("data", data, dataLen); - input->bufferLivePacket(pack.getTime(), pack.getInt("offset"), idx, data, dataLen, - pack.getInt("bpos"), pack.getFlag("keyframe")); - } - } - } - { + threadTimer[tid] = Util::bootSecs(); + //Non-stream tracks simply flush all packets and continue + if (!dataTrack){ + while (liveStream.hasPacket(tid)){liveStream.getPacket(tid, pack);} + continue; + } + //If we arrive here, we want the stream data + //Make sure the track is valid, loaded, etc + if (!meta || idx == INVALID_TRACK_ID || !meta.trackValid(idx)){ + {//Only lock the mutex for as long as strictly necessary tthread::lock_guard guard(threadClaimMutex); - threadTimer[tid] = Util::bootSecs(); + std::map overrides; + overrides["singular"] = ""; + if (!Util::streamAlive(globalStreamName) && !Util::startInput(globalStreamName, "push://INTERNAL_ONLY:" + cfgPointer->getString("input"), true, true, overrides)){ + FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str()); + return; + } + if (!input->hasMeta()){input->reloadClientMeta();} } - if (!liveStream.hasPacket(tid)){ - if (liveStream.isDataTrack(tid)){userConn.keepAlive();} + //This meta object is thread local, no mutex needed + meta.reInit(globalStreamName, false); + if (!meta){ + //Meta init failure, retry later Util::sleep(100); + continue; + } + liveStream.initializeMetadata(meta, tid); + idx = meta.trackIDToIndex(tid, getpid()); + if (idx != INVALID_TRACK_ID){ + //Successfully assigned a track index! Inform the buffer we're pushing + userConn.reload(globalStreamName, idx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + } + //Any kind of failure? Retry later. + if (idx == INVALID_TRACK_ID || !meta.trackValid(idx)){ + Util::sleep(100); + continue; + } + } + while (liveStream.hasPacket(tid)){ + liveStream.getPacket(tid, pack); + if (pack){ + char *data; + size_t dataLen; + pack.getString("data", data, dataLen); + uint64_t adjustTime = pack.getTime() + timeStampOffset; + if (lastTimeStamp || timeStampOffset){ + if (lastTimeStamp + 5000 < adjustTime || lastTimeStamp > adjustTime + 5000){ + INFO_MSG("Timestamp jump " PRETTY_PRINT_MSTIME " -> " PRETTY_PRINT_MSTIME ", compensating.", PRETTY_ARG_MSTIME(lastTimeStamp), PRETTY_ARG_MSTIME(adjustTime)); + timeStampOffset += (lastTimeStamp-adjustTime); + adjustTime = pack.getTime() + timeStampOffset; + } + } + lastTimeStamp = adjustTime; + { + tthread::lock_guard guard(threadClaimMutex); + //If the main thread's local metadata doesn't have this track yet, reload metadata + if (!input->trackLoaded(idx)){ + input->reloadClientMeta(); + if (!input->trackLoaded(idx)){ + FAIL_MSG("Track %zu could not be loaded into main thread - throwing away packet", idx); + continue; + } + } + input->bufferLivePacket(adjustTime, pack.getInt("offset"), idx, data, dataLen, + pack.getInt("bpos"), pack.getFlag("keyframe")); + } } } } @@ -509,7 +524,6 @@ namespace Mist{ return; } }else{ - std::string leftData; bool received = false; while (udpCon.Receive()){ downCounter += udpCon.data.size(); @@ -519,35 +533,40 @@ namespace Mist{ INFO_MSG("Now receiving UDP data..."); } size_t offset = 0; + size_t amount = 188-leftData.size(); + if (leftData.size() && udpCon.data.size() >= amount){ + //Attempt to re-assemble a packet from the leftovers of last time + current head + if (udpCon.data.size() == amount || udpCon.data[amount] == 0x47){ + VERYHIGH_MSG("Assembled scrap packet"); + //Success! + leftData.append(udpCon.data, amount); + liveStream.add(leftData); + if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} + offset = amount; + leftData.assign(0,0); + } + //On failure, hope we might live to succeed another day + } // Try to read full TS Packets // Watch out! We push here to a global, in order for threads to be able to access it. + size_t junk = 0; while (offset < udpCon.data.size()){ - if (udpCon.data[offset] == 0x47){// check for sync byte + if (udpCon.data[offset] == 0x47 && (offset+188 >= udpCon.data.size() || udpCon.data[offset+188] == 0x47)){// check for sync byte + if (junk){ + INFO_MSG("%zu bytes of non-sync-byte data received", junk); + junk = 0; + } if (offset + 188 <= udpCon.data.size()){ tsBuf.FromPointer(udpCon.data + offset); liveStream.add(tsBuf); if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} - leftData.clear(); }else{ - leftData.append(udpCon.data + offset, udpCon.data.size() - offset); + leftData.assign(udpCon.data + offset, udpCon.data.size() - offset); } offset += 188; }else{ - uint32_t maxBytes = - std::min((uint32_t)(188 - leftData.size()), (uint32_t)(udpCon.data.size() - offset)); - uint32_t numBytes = maxBytes; - VERYHIGH_MSG("%" PRIu32 " bytes of non-sync-byte data received", numBytes); - if (leftData.size()){ - leftData.append(udpCon.data + offset, numBytes); - while (leftData.size() >= 188){ - VERYHIGH_MSG("Assembled scrap packet"); - tsBuf.FromPointer((char *)leftData.data()); - liveStream.add(tsBuf); - if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} - leftData.erase(0, 188); - } - } - offset += numBytes; + ++junk; + ++offset; } } } @@ -593,6 +612,7 @@ namespace Mist{ Util::logExitReason("no active threads and we had input in the past"); return; }else{ + liveStream.clear(); hasStarted = false; } } diff --git a/src/input/input_ts.h b/src/input/input_ts.h index 5ec46d36..af578c87 100644 --- a/src/input/input_ts.h +++ b/src/input/input_ts.h @@ -33,6 +33,7 @@ namespace Mist{ void streamMainLoop(); void finish(); FILE *inFile; ///< The input file with ts data + Util::ResizeablePointer leftData; TS::Stream tsStream; ///< Used for parsing the incoming ts stream Socket::UDPConnection udpCon; Socket::Connection tcpCon; diff --git a/src/output/output.h b/src/output/output.h index 0e368710..8e1afa65 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -124,7 +124,7 @@ namespace Mist{ std::set getSupportedTracks(const std::string &type = "") const; - inline bool keepGoing(){return config->is_active && myConn;} + inline virtual bool keepGoing(){return config->is_active && myConn;} Comms::Statistics statComm; bool isBlocking; ///< If true, indicates that myConn is blocking. diff --git a/src/output/output_ts.h b/src/output/output_ts.h index 09899905..1b2457c5 100644 --- a/src/output/output_ts.h +++ b/src/output/output_ts.h @@ -20,6 +20,11 @@ namespace Mist{ Socket::UDPConnection pushSock; TS::Stream tsIn; std::string getStatsName(); + + protected: + inline virtual bool keepGoing(){ + return config->is_active && (!listenMode() || myConn); + } }; }// namespace Mist