#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "input_ts.h" #include #include #include #define SEM_TS_CLAIM "/MstTSIN%s" std::string globalStreamName; TS::Stream liveStream(true); Util::Config * cfgPointer = NULL; #define THREAD_TIMEOUT 15 std::map threadTimer; std::set claimableThreads; void parseThread(void * ignored) { char semName[NAME_BUFFER_SIZE]; snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); int tid = -1; lock.wait(); if (claimableThreads.size()) { tid = *claimableThreads.begin(); claimableThreads.erase(claimableThreads.begin()); } lock.post(); if (tid == -1) { return; } if (liveStream.isDataTrack(tid)){ if (!Util::startInput(globalStreamName, "push://INTERNAL_ONLY:"+cfgPointer->getString("input"), true, true)) {//manually override stream url to start the buffer FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str()); return; } } Mist::negotiationProxy myProxy; myProxy.streamName = globalStreamName; DTSC::Meta myMeta; if (liveStream.isDataTrack(tid)){ char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, globalStreamName.c_str()); myProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); myProxy.userClient.countAsViewer = false; } threadTimer[tid] = Util::bootSecs(); while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active && (!liveStream.isDataTrack(tid) || myProxy.userClient.isAlive())) { liveStream.parse(tid); if (liveStream.hasPacket(tid)){ liveStream.initializeMetadata(myMeta, tid); DTSC::Packet pack; liveStream.getPacket(tid, pack); if (pack && myMeta.tracks.count(tid)){ myProxy.continueNegotiate(tid, myMeta, true); myProxy.bufferLivePacket(pack, myMeta); } lock.wait(); threadTimer[tid] = Util::bootSecs(); lock.post(); } if (!liveStream.hasPacket(tid)){ if (liveStream.isDataTrack(tid)){ myProxy.userClient.keepAlive(); } Util::sleep(100); } } lock.wait(); std::string reason = "unknown reason"; if (!(Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT)){reason = "thread timeout";} if (!cfgPointer->is_active){reason = "input shutting down";} if (!(!liveStream.isDataTrack(tid) || myProxy.userClient.isAlive())){ reason = "buffer disconnect"; cfgPointer->is_active = false; } INFO_MSG("Shutting down thread because %s", reason.c_str()); threadTimer.erase(tid); lock.post(); liveStream.eraseTrack(tid); myProxy.userClient.finish(); } namespace Mist { /// Constructor of TS Input /// \arg cfg Util::Config that contains all current configurations. inputTS::inputTS(Util::Config * cfg) : Input(cfg) { capa["name"] = "TS"; capa["decs"] = "MPEG2-TS input from static files, streamed files, or multicast/unicast UDP socket"; capa["source_match"].append("/*.ts"); capa["source_match"].append("stream://*.ts"); capa["source_match"].append("tsudp://*"); capa["source_match"].append("ts-exec:*"); //These can/may be set to always-on mode capa["always_match"].append("stream://*.ts"); capa["always_match"].append("tsudp://*"); capa["always_match"].append("ts-exec:*"); capa["priority"] = 9ll; capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("HEVC"); capa["codecs"][0u][1u].append("AAC"); capa["codecs"][0u][1u].append("AC3"); inFile = NULL; inputProcess = 0; } inputTS::~inputTS() { if (inFile) { fclose(inFile); } if (!standAlone){ char semName[NAME_BUFFER_SIZE]; snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); lock.wait(); threadTimer.clear(); claimableThreads.clear(); lock.post(); lock.unlink(); } } ///Live Setup of TS Input bool inputTS::preRun() { const std::string & inpt = config->getString("input"); //streamed standard input if (inpt == "-" || inpt.substr(0, 8) == "ts-exec:") { standAlone = false; if (inpt.size() > 1){ std::string input = inpt.substr(8); char *args[128]; uint8_t argCnt = 0; char *startCh = 0; for (char *i = (char*)input.c_str(); i <= input.data() + input.size(); ++i){ if (!*i){ if (startCh){args[argCnt++] = startCh;} break; } if (*i == ' '){ if (startCh){ args[argCnt++] = startCh; startCh = 0; *i = 0; } }else{ if (!startCh){startCh = i;} } } args[argCnt] = 0; int fin = -1, fout = -1, ferr = -1; inputProcess = Util::Procs::StartPiped(args, &fin, &fout, &ferr); inFile = fdopen(fout, "r"); }else{ inFile = stdin; } return true; } //streamed file if (inpt.substr(0,9) == "stream://"){ inFile = fopen(inpt.c_str()+9, "r"); standAlone = false; return inFile; } //UDP input (tsudp://[host:]port[/iface[,iface[,...]]]) if (inpt.substr(0, 8) == "tsudp://"){ HTTP::URL input_url(inpt); standAlone = false; udpCon.setBlocking(false); udpCon.bind(input_url.getPort(), input_url.host, input_url.path); return udpCon.getSock() != -1; } //plain VoD file inFile = fopen(inpt.c_str(), "r"); return inFile; } ///Track selector of TS Input ///\arg trackSpec specifies which tracks are to be selected ///\todo test whether selecting a subset of tracks work void inputTS::trackSelect(std::string trackSpec) { selectedTracks.clear(); long long int index; while (trackSpec != "") { index = trackSpec.find(' '); selectedTracks.insert(atoi(trackSpec.substr(0, index).c_str())); if (index != std::string::npos) { trackSpec.erase(0, index + 1); } else { trackSpec = ""; } } } ///Reads headers from a TS stream, and saves them into metadata ///It works by going through the entire TS stream, and every time ///It encounters a new PES start, it writes the currently found PES data ///for a specific track to metadata. After the entire stream has been read, ///it writes the remaining metadata. ///\todo Find errors, perhaps parts can be made more modular bool inputTS::readHeader() { if (!standAlone){return true;} if (!inFile){return false;} //See whether a separate header file exists. if (readExistingHeader()){return true;} TS::Packet packet;//to analyse and extract data fseek(inFile, 0, SEEK_SET);//seek to beginning long long int lastBpos = 0; while (packet.FromFile(inFile) && !feof(inFile)) { tsStream.parse(packet, lastBpos); lastBpos = ftell(inFile); while (tsStream.hasPacketOnEachTrack()) { DTSC::Packet headerPack; tsStream.getEarliestPacket(headerPack); if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) { tsStream.initializeMetadata(myMeta, headerPack.getTrackId()); } myMeta.update(headerPack); } } DTSC::Packet headerPack; tsStream.getEarliestPacket(headerPack); while (headerPack) { if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) { tsStream.initializeMetadata(myMeta, headerPack.getTrackId()); } myMeta.update(headerPack); tsStream.getEarliestPacket(headerPack); } fseek(inFile, 0, SEEK_SET); myMeta.toFile(config->getString("input") + ".dtsh"); return true; } ///Gets the next packet that is to be sent ///At the moment, the logic of sending the last packet that was finished has been implemented, ///but the seeking and finding data is not yet ready. ///\todo Finish the implementation void inputTS::getNext(bool smart) { INSANE_MSG("Getting next"); thisPacket.null(); bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); while (!hasPacket && !feof(inFile) && (inputProcess == 0 || Util::Procs::childRunning(inputProcess)) && config->is_active) { unsigned int bPos = ftell(inFile); tsBuf.FromFile(inFile); if (selectedTracks.count(tsBuf.getPID())) { tsStream.parse(tsBuf, bPos); } hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); } if (!hasPacket) { return; } if (selectedTracks.size() == 1) { tsStream.getPacket(*selectedTracks.begin(), thisPacket); } else { tsStream.getEarliestPacket(thisPacket); } if (!thisPacket){ FAIL_MSG("Could not getNext TS packet!"); return; } tsStream.initializeMetadata(myMeta); if (!myMeta.tracks.count(thisPacket.getTrackId())) { getNext(); } } void inputTS::readPMT() { //save current file position int bpos = ftell(inFile); if (fseek(inFile, 0, SEEK_SET)) { FAIL_MSG("Seek to 0 failed"); return; } TS::Packet tsBuffer; while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromFile(inFile)) { tsStream.parse(tsBuffer, 0); } //Clear leaves the PMT in place tsStream.partialClear(); //Restore original file position if (fseek(inFile, bpos, SEEK_SET)) { return; } } ///Seeks to a specific time void inputTS::seek(int seekTime) { tsStream.clear(); readPMT(); unsigned long seekPos = 0xFFFFFFFFull; for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) { unsigned long thisBPos = 0; for (std::deque::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++) { if (keyIt->getTime() > seekTime) { break; } thisBPos = keyIt->getBpos(); } if (thisBPos < seekPos) { seekPos = thisBPos; } } fseek(inFile, seekPos, SEEK_SET);//seek to the correct position } void inputTS::stream() { IPC::sharedClient statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); uint64_t downCounter = 0; uint64_t startTime = Util::epoch(); uint64_t noDataSince = Util::bootSecs(); bool hasStarted = false; cfgPointer = config; globalStreamName = streamName; unsigned long long threadCheckTimer = Util::bootSecs(); while (config->is_active) { if (inFile) { if (feof(inFile)){ config->is_active = false; INFO_MSG("Reached end of file on streamed input"); } int ctr = 0; while (ctr < 20 && tsBuf.FromFile(inFile) && !feof(inFile)){ liveStream.add(tsBuf); downCounter += 188; ctr++; } } else { std::string leftData; while (udpCon.Receive()) { int offset = 0; //Try to read full TS Packets //Watch out! We push here to a global, in order for threads to be able to access it. while (offset < udpCon.data_len) { if (udpCon.data[0] == 0x47){//check for sync byte if (offset + 188 <= udpCon.data_len){ liveStream.add(udpCon.data + offset); noDataSince = Util::bootSecs(); downCounter += 188; }else{ leftData.append(udpCon.data + offset, udpCon.data_len - offset); } offset += 188; }else{ if (leftData.size()){ leftData.append(udpCon.data + offset, 1); if (leftData.size() >= 188){ liveStream.add((char*)leftData.data()); noDataSince = Util::bootSecs(); downCounter += 188; leftData.erase(0, 188); } } ++offset; } } } } //Check for and spawn threads here. if (Util::bootSecs() - threadCheckTimer > 2) { //Connect to stats for INPUT detection uint64_t now = Util::epoch(); if (!statsPage.getData()){ statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); } if (statsPage.getData()){ if (!statsPage.isAlive()){ config->is_active = false; return; } IPC::statExchange tmpEx(statsPage.getData()); tmpEx.now(now); tmpEx.crc(getpid()); tmpEx.streamName(streamName); tmpEx.connector("INPUT"); tmpEx.up(0); tmpEx.down(downCounter); tmpEx.time(now - startTime); tmpEx.lastSecond(0); statsPage.keepAlive(); } std::set activeTracks = liveStream.getActiveTracks(); char semName[NAME_BUFFER_SIZE]; snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); lock.wait(); if (hasStarted && !threadTimer.size()){ if (!isAlwaysOn()){ INFO_MSG("Shutting down because no active threads and we had input in the past"); config->is_active = false; }else{ hasStarted = false; } } for (std::set::iterator it = activeTracks.begin(); it != activeTracks.end(); it++) { if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))) { WARN_MSG("Thread for track %d timed out %d seconds ago without a clean shutdown.", *it, Util::bootSecs() - threadTimer[*it]); threadTimer.erase(*it); } if (!hasStarted){ hasStarted = true; } if (!threadTimer.count(*it)) { //Add to list of unclaimed threads claimableThreads.insert(*it); //Spawn thread here. tthread::thread thisThread(parseThread, 0); thisThread.detach(); } } lock.post(); threadCheckTimer = Util::bootSecs(); } if (!inFile){ Util::sleep(100); if (Util::bootSecs() - noDataSince > 20){ if (!isAlwaysOn()){ WARN_MSG("No packets received for 20 seconds - terminating"); config->is_active = false; }else{ noDataSince = Util::bootSecs(); } } } } finish(); INFO_MSG("Input for stream %s closing clean", streamName.c_str()); } void inputTS::finish() { if (standAlone){ Input::finish(); return; } char semName[NAME_BUFFER_SIZE]; snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); int threadCount = 0; do { lock.wait(); threadCount = threadTimer.size(); lock.post(); if (threadCount){ Util::sleep(100); } } while (threadCount); } bool inputTS::needsLock() { //we already know no lock will be needed if (!standAlone){return false;} //otherwise, check input param const std::string & inpt = config->getString("input"); if (inpt.size() && inpt != "-" && inpt.substr(0,9) != "stream://" && inpt.substr(0,8) != "tsudp://" && inpt.substr(0, 8) != "ts-exec:"){ return true; }else{ return false; } } }