#include "input_ts.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include tthread::mutex threadClaimMutex; std::string globalStreamName; TS::Stream liveStream; Util::Config *cfgPointer = NULL; #define THREAD_TIMEOUT 15 std::map threadTimer; std::set claimableThreads; void parseThread(void *mistIn){ Mist::inputTS *input = reinterpret_cast(mistIn); size_t tid = 0; { tthread::lock_guard guard(threadClaimMutex); if (claimableThreads.size()){ tid = *claimableThreads.begin(); claimableThreads.erase(claimableThreads.begin()); } } if (tid == 0){return;} Comms::Users userConn; DTSC::Meta meta; bool dataTrack = liveStream.isDataTrack(tid); if (dataTrack){ if (!Util::streamAlive(globalStreamName) && !Util::startInput(globalStreamName, "push://INTERNAL_ONLY:" + cfgPointer->getString("input"), true, true)){ FAIL_MSG("Could not start buffer for %s", globalStreamName.c_str()); return; } { tthread::lock_guard guard(threadClaimMutex); if (!input->hasMeta()){input->reloadClientMeta();} } meta.reInit(globalStreamName, false); } size_t idx = meta.trackIDToIndex(tid, getpid()); threadTimer[tid] = Util::bootSecs(); while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active && (!liveStream.isDataTrack(tid) || (userConn ? userConn.isAlive() : true))){ { tthread::lock_guard guard(threadClaimMutex); threadTimer[tid] = Util::bootSecs(); } if (liveStream.isDataTrack(tid)){userConn.keepAlive();} liveStream.parse(tid); if (!liveStream.hasPacket(tid)){ Util::sleep(100); continue; } uint64_t startSecs = Util::bootSecs(); while (liveStream.hasPacket(tid) && ((Util::bootSecs() < startSecs + 2) && cfgPointer->is_active && (!liveStream.isDataTrack(tid) || (userConn ? userConn.isAlive() : true)))){ liveStream.parse(tid); if (liveStream.hasPacket(tid)){ if (idx == INVALID_TRACK_ID){ tthread::lock_guard guard(threadClaimMutex); liveStream.initializeMetadata(meta, tid); idx = meta.trackIDToIndex(tid, getpid()); if (idx != INVALID_TRACK_ID){ userConn.reload(globalStreamName, idx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); input->reloadClientMeta(); } } if (idx == INVALID_TRACK_ID || !meta.trackValid(idx)){continue;} if (!meta.trackLoaded(idx)){meta.refresh();} DTSC::Packet pack; liveStream.getPacket(tid, pack); if (pack){ tthread::lock_guard guard(threadClaimMutex); if (!input->hasMeta()){input->reloadClientMeta();} if (dataTrack){ char *data; size_t dataLen; pack.getString("data", data, dataLen); input->bufferLivePacket(pack.getTime(), pack.getInt("offset"), idx, data, dataLen, pack.getInt("bpos"), pack.getFlag("keyframe")); } } } { tthread::lock_guard guard(threadClaimMutex); threadTimer[tid] = Util::bootSecs(); } if (!liveStream.hasPacket(tid)){ if (liveStream.isDataTrack(tid)){userConn.keepAlive();} Util::sleep(100); } } } std::string reason = "unknown reason"; if (!(Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT)){reason = "thread timeout";} if (!cfgPointer->is_active){reason = "input shutting down";} if (!(!liveStream.isDataTrack(tid) || userConn.isAlive())){ reason = "buffer disconnect"; cfgPointer->is_active = false; } INFO_MSG("Shutting down thread for %zu because %s", tid, reason.c_str()); { tthread::lock_guard guard(threadClaimMutex); threadTimer.erase(tid); } liveStream.eraseTrack(tid); if (dataTrack && userConn){userConn.setStatus(COMM_STATUS_DISCONNECT);} } namespace Mist{ /// Constructor of TS Input /// \arg cfg Util::Config that contains all current configurations. inputTS::inputTS(Util::Config *cfg) : Input(cfg){ capa["name"] = "TS"; capa["desc"] = "This input allows you to stream MPEG2-TS data from static files (/*.ts), streamed files " "or named pipes (stream://*.ts), streamed over HTTP (http(s)://*.ts, http(s)-ts://*), " "standard input (ts-exec:*), or multicast/unicast UDP sockets (tsudp://*)."; capa["source_match"].append("/*.ts"); capa["source_file"] = "$source"; capa["source_match"].append("/*.m2ts"); capa["source_match"].append("stream://*.ts"); capa["source_match"].append("tsudp://*"); capa["source_match"].append("ts-exec:*"); capa["source_match"].append("http://*.ts"); capa["source_match"].append("http-ts://*"); capa["source_match"].append("https://*.ts"); capa["source_match"].append("https-ts://*"); // These can/may be set to always-on mode capa["always_match"].append("stream://*.ts"); capa["always_match"].append("tsudp://*"); capa["always_match"].append("ts-exec:*"); capa["always_match"].append("http://*.ts"); capa["always_match"].append("http-ts://*"); capa["always_match"].append("https://*.ts"); capa["always_match"].append("https-ts://*"); capa["incoming_push_url"] = "udp://$host:$port"; capa["incoming_push_url_match"] = "tsudp://*"; capa["priority"] = 9; capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("HEVC"); capa["codecs"][0u][0u].append("MPEG2"); capa["codecs"][0u][1u].append("AAC"); capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("MP2"); inFile = NULL; inputProcess = 0; isFinished = false; { pid_t srt_tx = -1; const char *args[] ={"srt-live-transmit", 0}; srt_tx = Util::Procs::StartPiped(args, 0, 0, 0); if (srt_tx > 1){ capa["source_match"].append("srt://*"); capa["always_match"].append("srt://*"); capa["desc"] = capa["desc"].asStringRef() + " SRT support (srt://*) is installed and available."; }else{ capa["desc"] = capa["desc"].asStringRef() + " To enable SRT support, please install the srt-live-transmit binary."; } } JSON::Value option; option["arg"] = "integer"; option["long"] = "buffer"; option["short"] = "b"; option["help"] = "DVR buffer time in ms"; option["value"].append(50000); config->addOption("bufferTime", option); capa["optional"]["DVR"]["name"] = "Buffer time (ms)"; capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in milliseconds. This is the time " "available to seek around in, and will automatically be extended to fit whole keyframes as " "well as the minimum duration needed for stable playback."; capa["optional"]["DVR"]["option"] = "--buffer"; capa["optional"]["DVR"]["type"] = "uint"; capa["optional"]["DVR"]["default"] = 50000; } inputTS::~inputTS(){ if (inFile){fclose(inFile);} if (tcpCon){tcpCon.close();} if (!standAlone){ tthread::lock_guard guard(threadClaimMutex); threadTimer.clear(); claimableThreads.clear(); } } bool inputTS::checkArguments(){ if (config->getString("input").substr(0, 6) == "srt://"){ std::string source = config->getString("input"); HTTP::URL srtUrl(source); config->getOption("input", true).append("ts-exec:srt-live-transmit " + srtUrl.getUrl() + " file://con"); INFO_MSG("Rewriting SRT source '%s' to '%s'", source.c_str(), config->getString("input").c_str()); } return true; } /// Live Setup of TS Input bool inputTS::preRun(){ INFO_MSG("Prerun: %s", config->getString("input").c_str()); // streamed standard input if (config->getString("input") == "-"){ standAlone = false; tcpCon.open(fileno(stdout), fileno(stdin)); return true; } if (config->getString("input").substr(0, 7) == "http://" || config->getString("input").substr(0, 10) == "http-ts://" || config->getString("input").substr(0, 8) == "https://" || config->getString("input").substr(0, 11) == "https-ts://"){ standAlone = false; HTTP::URL url(config->getString("input")); if (url.protocol == "http-ts"){url.protocol = "http";} if (url.protocol == "https-ts"){url.protocol = "https";} HTTP::Downloader DL; DL.getHTTP().headerOnly = true; if (!DL.get(url)){return false;} tcpCon = DL.getSocket(); DL.getSocket().drop(); // Prevent shutdown of connection, keeping copy of socket open return true; } if (config->getString("input").substr(0, 8) == "ts-exec:"){ standAlone = false; std::string input = config->getString("input").substr(8); char *args[128]; uint8_t argCnt = 0; char *startCh = 0; for (char *i = (char *)input.c_str(); i <= input.data() + input.size(); ++i){ if (!*i){ if (startCh){args[argCnt++] = startCh;} break; } if (*i == ' '){ if (startCh){ args[argCnt++] = startCh; startCh = 0; *i = 0; } }else{ if (!startCh){startCh = i;} } } args[argCnt] = 0; int fin = -1, fout = -1; inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0); tcpCon.open(-1, fout); return true; } // streamed file if (config->getString("input").substr(0, 9) == "stream://"){ inFile = fopen(config->getString("input").c_str() + 9, "r"); tcpCon.open(-1, fileno(inFile)); standAlone = false; return inFile; } //file descriptor input if (config->getString("input").substr(0, 5) == "fd://"){ int fd = atoi(config->getString("input").c_str() + 5); INFO_MSG("Opening file descriptor %s (%d)", config->getString("input").c_str(), fd); tcpCon.open(-1, fd); standAlone = false; return tcpCon; } // UDP input (tsudp://[host:]port[/iface[,iface[,...]]]) if (config->getString("input").substr(0, 8) == "tsudp://"){ standAlone = false; return true; } // plain VoD file inFile = fopen(config->getString("input").c_str(), "r"); return inFile; } bool inputTS::needHeader(){ if (!standAlone){return false;} return Input::needHeader(); } /// Reads headers from a TS stream, and saves them into metadata /// It works by going through the entire TS stream, and every time /// It encounters a new PES start, it writes the currently found PES data /// for a specific track to metadata. After the entire stream has been read, /// it writes the remaining metadata. ///\todo Find errors, perhaps parts can be made more modular bool inputTS::readHeader(){ if (!inFile){return false;} meta.reInit(streamName); TS::Packet packet; // to analyse and extract data DTSC::Packet headerPack; fseek(inFile, 0, SEEK_SET); // seek to beginning uint64_t lastBpos = 0; while (packet.FromFile(inFile) && !feof(inFile)){ tsStream.parse(packet, lastBpos); lastBpos = Util::ftell(inFile); if (packet.getUnitStart()){ while (tsStream.hasPacketOnEachTrack()){ tsStream.getEarliestPacket(headerPack); size_t pid = headerPack.getTrackId(); size_t idx = M.trackIDToIndex(pid, getpid()); if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ tsStream.initializeMetadata(meta, pid); idx = M.trackIDToIndex(pid, getpid()); } char *data; size_t dataLen; headerPack.getString("data", data, dataLen); meta.update(headerPack.getTime(), headerPack.getInt("offset"), idx, dataLen, headerPack.getInt("bpos"), headerPack.getFlag("keyframe"), headerPack.getDataLen()); } } } tsStream.finish(); INFO_MSG("Reached %s at %" PRIu64 " bytes", feof(inFile) ? "EOF" : "error", lastBpos); while (tsStream.hasPacket()){ tsStream.getEarliestPacket(headerPack); size_t pid = headerPack.getTrackId(); size_t idx = M.trackIDToIndex(pid, getpid()); if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ tsStream.initializeMetadata(meta, pid); idx = M.trackIDToIndex(pid, getpid()); } char *data; size_t dataLen; headerPack.getString("data", data, dataLen); meta.update(headerPack.getTime(), headerPack.getInt("offset"), idx, dataLen, headerPack.getInt("bpos"), headerPack.getFlag("keyframe"), headerPack.getDataLen()); } fseek(inFile, 0, SEEK_SET); meta.toFile(config->getString("input") + ".dtsh"); return true; } /// Gets the next packet that is to be sent /// At the moment, the logic of sending the last packet that was finished has been implemented, /// but the seeking and finding data is not yet ready. ///\todo Finish the implementation void inputTS::getNext(size_t idx){ size_t pid = (idx == INVALID_TRACK_ID ? 0 : M.getID(idx)); INSANE_MSG("Getting next on track %zu", idx); thisPacket.null(); bool hasPacket = (idx == INVALID_TRACK_ID ? tsStream.hasPacket() : tsStream.hasPacket(pid)); while (!hasPacket && !feof(inFile) && (inputProcess == 0 || Util::Procs::childRunning(inputProcess)) && config->is_active){ tsBuf.FromFile(inFile); if (idx == INVALID_TRACK_ID || pid == tsBuf.getPID()){ tsStream.parse(tsBuf, 0); // bPos == 0 if (tsBuf.getUnitStart()){ hasPacket = (idx == INVALID_TRACK_ID ? tsStream.hasPacket() : tsStream.hasPacket(pid)); } } } if (feof(inFile)){ if (!isFinished){ tsStream.finish(); isFinished = true; } hasPacket = true; } if (!hasPacket){return;} if (idx == INVALID_TRACK_ID){ if (tsStream.hasPacket()){tsStream.getEarliestPacket(thisPacket);} }else{ if (tsStream.hasPacket(pid)){tsStream.getPacket(pid, thisPacket);} } if (!thisPacket){ INFO_MSG("Could not getNext TS packet!"); return; } tsStream.initializeMetadata(meta); size_t thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid()); if (thisIdx == INVALID_TRACK_ID){getNext(idx);} } void inputTS::readPMT(){ // save current file position uint64_t bpos = Util::ftell(inFile); if (fseek(inFile, 0, SEEK_SET)){ FAIL_MSG("Seek to 0 failed"); return; } TS::Packet tsBuffer; while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromFile(inFile)){ tsStream.parse(tsBuffer, 0); } // Clear leaves the PMT in place tsStream.partialClear(); // Restore original file position if (Util::fseek(inFile, bpos, SEEK_SET)){ clearerr(inFile); return; } } /// Seeks to a specific time void inputTS::seek(uint64_t seekTime, size_t idx){ tsStream.clear(); readPMT(); uint64_t seekPos = 0xFFFFFFFFull; if (idx != INVALID_TRACK_ID){ DTSC::Keys keys(M.keys(idx)); uint32_t keyNum = keys.getNumForTime(seekTime); seekPos = keys.getBpos(keyNum); }else{ std::set tracks = M.getValidTracks(); for (std::set::iterator it = tracks.begin(); it != tracks.end(); it++){ DTSC::Keys keys(M.keys(*it)); uint32_t keyNum = keys.getNumForTime(seekTime); uint64_t thisBPos = keys.getBpos(keyNum); if (thisBPos < seekPos){seekPos = thisBPos;} } } clearerr(inFile); Util::fseek(inFile, seekPos, SEEK_SET); // seek to the correct position } bool inputTS::openStreamSource(){ const std::string &inpt = config->getString("input"); if (inpt.substr(0, 8) == "tsudp://"){ HTTP::URL input_url(inpt); udpCon.setBlocking(false); udpCon.bind(input_url.getPort(), input_url.host, input_url.path); if (udpCon.getSock() == -1){ FAIL_MSG("Could not open UDP socket. Aborting."); return false; } } return true; } void inputTS::parseStreamHeader(){ // Placeholder empty track to force normal code to continue despite no tracks available tmpIdx = meta.addTrack(0, 0, 0, 0); } void inputTS::streamMainLoop(){ meta.removeTrack(tmpIdx); INFO_MSG("Removed temptrack %zu", tmpIdx); Comms::Statistics statComm; uint64_t downCounter = 0; uint64_t startTime = Util::bootSecs(); uint64_t noDataSince = Util::bootSecs(); bool gettingData = false; bool hasStarted = false; cfgPointer = config; globalStreamName = streamName; unsigned long long threadCheckTimer = Util::bootSecs(); while (config->is_active){ if (tcpCon){ if (tcpCon.spool()){ while (tcpCon.Received().available(188)){ while (tcpCon.Received().get()[0] != 0x47 && tcpCon.Received().available(188)){ tcpCon.Received().remove(1); } if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){ std::string newData = tcpCon.Received().remove(188); tsBuf.FromPointer(newData.data()); liveStream.add(tsBuf); if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} } } noDataSince = Util::bootSecs(); }else{ Util::sleep(100); } if (!tcpCon){ config->is_active = false; Util::logExitReason("end of streamed input"); return; } }else{ std::string leftData; bool received = false; while (udpCon.Receive()){ downCounter += udpCon.data_len; received = true; if (!gettingData){ gettingData = true; INFO_MSG("Now receiving UDP data..."); } size_t offset = 0; // Try to read full TS Packets // Watch out! We push here to a global, in order for threads to be able to access it. while (offset < udpCon.data_len){ if (udpCon.data[offset] == 0x47){// check for sync byte if (offset + 188 <= udpCon.data_len){ tsBuf.FromPointer(udpCon.data + offset); liveStream.add(tsBuf); if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} leftData.clear(); }else{ leftData.append(udpCon.data + offset, udpCon.data_len - offset); } offset += 188; }else{ uint32_t maxBytes = std::min((uint32_t)(188 - leftData.size()), (uint32_t)(udpCon.data_len - offset)); uint32_t numBytes = maxBytes; VERYHIGH_MSG("%" PRIu32 " bytes of non-sync-byte data received", numBytes); if (leftData.size()){ leftData.append(udpCon.data + offset, numBytes); while (leftData.size() >= 188){ VERYHIGH_MSG("Assembled scrap packet"); tsBuf.FromPointer((char *)leftData.data()); liveStream.add(tsBuf); if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} leftData.erase(0, 188); } } offset += numBytes; } } } if (!received){ Util::sleep(100); }else{ noDataSince = Util::bootSecs(); } } if (gettingData && Util::bootSecs() - noDataSince > 1){ gettingData = false; INFO_MSG("No longer receiving data."); } // Check for and spawn threads here. if (Util::bootSecs() - threadCheckTimer > 1){ // Connect to stats for INPUT detection statComm.reload(); if (statComm){ if (!statComm.isAlive()){ config->is_active = false; Util::logExitReason("received shutdown request from controller"); return; } uint64_t now = Util::bootSecs(); statComm.setNow(now); statComm.setCRC(getpid()); statComm.setStream(streamName); statComm.setConnector("INPUT:" + capa["name"].asStringRef()); statComm.setUp(0); statComm.setDown(downCounter + tcpCon.dataDown()); statComm.setTime(now - startTime); statComm.setLastSecond(0); statComm.setHost(getConnectedBinHost()); statComm.keepAlive(); } std::set activeTracks = liveStream.getActiveTracks(); { tthread::lock_guard guard(threadClaimMutex); if (hasStarted && !threadTimer.size()){ if (!isAlwaysOn()){ config->is_active = false; Util::logExitReason("no active threads and we had input in the past"); return; }else{ hasStarted = false; } } for (std::set::iterator it = activeTracks.begin(); it != activeTracks.end(); it++){ if (!liveStream.isDataTrack(*it)){continue;} if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))){ WARN_MSG("Thread for track %" PRIu64 " timed out %" PRIu64 " seconds ago without a clean shutdown.", *it, Util::bootSecs() - threadTimer[*it]); threadTimer.erase(*it); } if (!hasStarted){hasStarted = true;} if (!threadTimer.count(*it)){ // Add to list of unclaimed threads claimableThreads.insert(*it); // Spawn thread here. tthread::thread thisThread(parseThread, this); thisThread.detach(); } } } threadCheckTimer = Util::bootSecs(); } if (Util::bootSecs() - noDataSince > 20){ if (!isAlwaysOn()){ config->is_active = false; Util::logExitReason("no packets received for 20 seconds"); return; }else{ noDataSince = Util::bootSecs(); } } } } void inputTS::finish(){ if (standAlone){ Input::finish(); return; } int threadCount = 0; do{ { tthread::lock_guard guard(threadClaimMutex); threadCount = threadTimer.size(); } if (threadCount){Util::sleep(100);} }while (threadCount); } bool inputTS::needsLock(){ // we already know no lock will be needed if (!standAlone){return false;} // otherwise, check input param const std::string &inpt = config->getString("input"); if (inpt.size() && inpt != "-" && inpt.substr(0, 9) != "stream://" && inpt.substr(0, 8) != "tsudp://" && inpt.substr(0, 8) != "ts-exec:" && inpt.substr(0, 6) != "srt://" && inpt.substr(0, 7) != "http://" && inpt.substr(0, 10) != "http-ts://" && inpt.substr(0, 8) != "https://" && inpt.substr(0, 11) != "https-ts://"){ return Input::needsLock(); } return false; } }// namespace Mist