Various TS improvements

This commit is contained in:
Thulinma 2020-06-25 00:18:23 +02:00
parent 3d26741148
commit 1fff195f77
7 changed files with 110 additions and 84 deletions

View file

@ -317,6 +317,7 @@ namespace TS{
}
if (psCache->size() <= 1){
if (!finished){FAIL_MSG("No PES packets to parse");}
seenUnitStart[tid] = 0;
return;
}
// Find number of packets before unit Start
@ -336,6 +337,7 @@ namespace TS{
}
if (!finished && curPack == psCache->end()){
FAIL_MSG("No PES packets to parse (%" PRIu32 ")", seenUnitStart[tid]);
seenUnitStart[tid] = 0;
return;
}

View file

@ -37,14 +37,11 @@ namespace Mist{
void Input::reloadClientMeta(){
if (M.getStreamName() != "" && M.getMaster()){return;}
if (M.getStreamName() != streamName){
meta.reInit(streamName, false);
}else{
meta.refresh();
}
}
bool Input::hasMeta() const{return M && M.getStreamName() != "" && M.getValidTracks().size();}
bool Input::trackLoaded(size_t idx) const{return (M && M.trackLoaded(idx));}
Input::Input(Util::Config *cfg) : InOutBase(){
config = cfg;

View file

@ -31,6 +31,7 @@ namespace Mist{
bool keepAlive();
void reloadClientMeta();
bool hasMeta() const;
bool trackLoaded(size_t idx) const;
static Util::Config *config;
virtual bool needsLock(){return !config->getBool("realtime");}
virtual bool publishesTracks(){return true;}

View file

@ -31,7 +31,11 @@ std::map<size_t, uint64_t> threadTimer;
std::set<size_t> claimableThreads;
/// Global, so that all tracks stay in sync
int64_t timeStampOffset = 0;
void parseThread(void *mistIn){
uint64_t lastTimeStamp = 0;
Mist::inputTS *input = reinterpret_cast<Mist::inputTS *>(mistIn);
size_t tid = 0;
@ -46,76 +50,87 @@ void parseThread(void *mistIn){
Comms::Users userConn;
DTSC::Meta meta;
DTSC::Packet pack;
bool dataTrack = liveStream.isDataTrack(tid);
if (dataTrack){
if (!Util::streamAlive(globalStreamName) &&
!Util::startInput(globalStreamName, "push://INTERNAL_ONLY:" + cfgPointer->getString("input"), true, true)){
FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str());
return;
}
size_t idx = INVALID_TRACK_ID;
{
tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
if (!input->hasMeta()){input->reloadClientMeta();}
}
meta.reInit(globalStreamName, false);
}
size_t idx = meta.trackIDToIndex(tid, getpid());
threadTimer[tid] = Util::bootSecs();
}
while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active &&
(!liveStream.isDataTrack(tid) || (userConn ? userConn.isAlive() : true))){
{
tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
threadTimer[tid] = Util::bootSecs();
}
if (liveStream.isDataTrack(tid)){userConn.keepAlive();}
(!dataTrack || (userConn ? userConn.isAlive() : true))){
if (dataTrack){userConn.keepAlive();}
liveStream.parse(tid);
if (!liveStream.hasPacket(tid)){
Util::sleep(100);
continue;
}
uint64_t startSecs = Util::bootSecs();
while (liveStream.hasPacket(tid) &&
((Util::bootSecs() < startSecs + 2) && cfgPointer->is_active &&
(!liveStream.isDataTrack(tid) || (userConn ? userConn.isAlive() : true)))){
liveStream.parse(tid);
if (liveStream.hasPacket(tid)){
if (idx == INVALID_TRACK_ID){
threadTimer[tid] = Util::bootSecs();
//Non-stream tracks simply flush all packets and continue
if (!dataTrack){
while (liveStream.hasPacket(tid)){liveStream.getPacket(tid, pack);}
continue;
}
//If we arrive here, we want the stream data
//Make sure the track is valid, loaded, etc
if (!meta || idx == INVALID_TRACK_ID || !meta.trackValid(idx)){
{//Only lock the mutex for as long as strictly necessary
tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
std::map<std::string, std::string> overrides;
overrides["singular"] = "";
if (!Util::streamAlive(globalStreamName) && !Util::startInput(globalStreamName, "push://INTERNAL_ONLY:" + cfgPointer->getString("input"), true, true, overrides)){
FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str());
return;
}
if (!input->hasMeta()){input->reloadClientMeta();}
}
//This meta object is thread local, no mutex needed
meta.reInit(globalStreamName, false);
if (!meta){
//Meta init failure, retry later
Util::sleep(100);
continue;
}
liveStream.initializeMetadata(meta, tid);
idx = meta.trackIDToIndex(tid, getpid());
if (idx != INVALID_TRACK_ID){
//Successfully assigned a track index! Inform the buffer we're pushing
userConn.reload(globalStreamName, idx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
input->reloadClientMeta();
}
//Any kind of failure? Retry later.
if (idx == INVALID_TRACK_ID || !meta.trackValid(idx)){
Util::sleep(100);
continue;
}
}
if (idx == INVALID_TRACK_ID || !meta.trackValid(idx)){continue;}
if (!meta.trackLoaded(idx)){meta.refresh();}
DTSC::Packet pack;
while (liveStream.hasPacket(tid)){
liveStream.getPacket(tid, pack);
if (pack){
tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
if (!input->hasMeta()){input->reloadClientMeta();}
if (dataTrack){
char *data;
size_t dataLen;
pack.getString("data", data, dataLen);
input->bufferLivePacket(pack.getTime(), pack.getInt("offset"), idx, data, dataLen,
pack.getInt("bpos"), pack.getFlag("keyframe"));
}
uint64_t adjustTime = pack.getTime() + timeStampOffset;
if (lastTimeStamp || timeStampOffset){
if (lastTimeStamp + 5000 < adjustTime || lastTimeStamp > adjustTime + 5000){
INFO_MSG("Timestamp jump " PRETTY_PRINT_MSTIME " -> " PRETTY_PRINT_MSTIME ", compensating.", PRETTY_ARG_MSTIME(lastTimeStamp), PRETTY_ARG_MSTIME(adjustTime));
timeStampOffset += (lastTimeStamp-adjustTime);
adjustTime = pack.getTime() + timeStampOffset;
}
}
lastTimeStamp = adjustTime;
{
tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
threadTimer[tid] = Util::bootSecs();
//If the main thread's local metadata doesn't have this track yet, reload metadata
if (!input->trackLoaded(idx)){
input->reloadClientMeta();
if (!input->trackLoaded(idx)){
FAIL_MSG("Track %zu could not be loaded into main thread - throwing away packet", idx);
continue;
}
}
input->bufferLivePacket(adjustTime, pack.getInt("offset"), idx, data, dataLen,
pack.getInt("bpos"), pack.getFlag("keyframe"));
}
if (!liveStream.hasPacket(tid)){
if (liveStream.isDataTrack(tid)){userConn.keepAlive();}
Util::sleep(100);
}
}
}
@ -509,7 +524,6 @@ namespace Mist{
return;
}
}else{
std::string leftData;
bool received = false;
while (udpCon.Receive()){
downCounter += udpCon.data.size();
@ -519,35 +533,40 @@ namespace Mist{
INFO_MSG("Now receiving UDP data...");
}
size_t offset = 0;
size_t amount = 188-leftData.size();
if (leftData.size() && udpCon.data.size() >= amount){
//Attempt to re-assemble a packet from the leftovers of last time + current head
if (udpCon.data.size() == amount || udpCon.data[amount] == 0x47){
VERYHIGH_MSG("Assembled scrap packet");
//Success!
leftData.append(udpCon.data, amount);
liveStream.add(leftData);
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
offset = amount;
leftData.assign(0,0);
}
//On failure, hope we might live to succeed another day
}
// Try to read full TS Packets
// Watch out! We push here to a global, in order for threads to be able to access it.
size_t junk = 0;
while (offset < udpCon.data.size()){
if (udpCon.data[offset] == 0x47){// check for sync byte
if (udpCon.data[offset] == 0x47 && (offset+188 >= udpCon.data.size() || udpCon.data[offset+188] == 0x47)){// check for sync byte
if (junk){
INFO_MSG("%zu bytes of non-sync-byte data received", junk);
junk = 0;
}
if (offset + 188 <= udpCon.data.size()){
tsBuf.FromPointer(udpCon.data + offset);
liveStream.add(tsBuf);
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
leftData.clear();
}else{
leftData.append(udpCon.data + offset, udpCon.data.size() - offset);
leftData.assign(udpCon.data + offset, udpCon.data.size() - offset);
}
offset += 188;
}else{
uint32_t maxBytes =
std::min((uint32_t)(188 - leftData.size()), (uint32_t)(udpCon.data.size() - offset));
uint32_t numBytes = maxBytes;
VERYHIGH_MSG("%" PRIu32 " bytes of non-sync-byte data received", numBytes);
if (leftData.size()){
leftData.append(udpCon.data + offset, numBytes);
while (leftData.size() >= 188){
VERYHIGH_MSG("Assembled scrap packet");
tsBuf.FromPointer((char *)leftData.data());
liveStream.add(tsBuf);
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
leftData.erase(0, 188);
}
}
offset += numBytes;
++junk;
++offset;
}
}
}
@ -593,6 +612,7 @@ namespace Mist{
Util::logExitReason("no active threads and we had input in the past");
return;
}else{
liveStream.clear();
hasStarted = false;
}
}

View file

@ -33,6 +33,7 @@ namespace Mist{
void streamMainLoop();
void finish();
FILE *inFile; ///< The input file with ts data
Util::ResizeablePointer leftData;
TS::Stream tsStream; ///< Used for parsing the incoming ts stream
Socket::UDPConnection udpCon;
Socket::Connection tcpCon;

View file

@ -124,7 +124,7 @@ namespace Mist{
std::set<size_t> getSupportedTracks(const std::string &type = "") const;
inline bool keepGoing(){return config->is_active && myConn;}
inline virtual bool keepGoing(){return config->is_active && myConn;}
Comms::Statistics statComm;
bool isBlocking; ///< If true, indicates that myConn is blocking.

View file

@ -20,6 +20,11 @@ namespace Mist{
Socket::UDPConnection pushSock;
TS::Stream tsIn;
std::string getStatsName();
protected:
inline virtual bool keepGoing(){
return config->is_active && (!listenMode() || myConn);
}
};
}// namespace Mist