diff --git a/CMakeLists.txt b/CMakeLists.txt index ca826dfd..6e18702a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -160,6 +160,7 @@ set(libHeaders ${SOURCE_DIR}/lib/tinythread.h ${SOURCE_DIR}/lib/ts_packet.h ${SOURCE_DIR}/lib/ts_stream.h + ${SOURCE_DIR}/lib/util.h ${SOURCE_DIR}/lib/vorbis.h ${SOURCE_DIR}/lib/triggers.h ) @@ -203,6 +204,7 @@ set(libSources ${SOURCE_DIR}/lib/tinythread.cpp ${SOURCE_DIR}/lib/ts_packet.cpp ${SOURCE_DIR}/lib/ts_stream.cpp + ${SOURCE_DIR}/lib/util.cpp ${SOURCE_DIR}/lib/vorbis.cpp ${SOURCE_DIR}/lib/triggers.cpp ) @@ -290,9 +292,6 @@ macro(makeInput inputName format) #Set compile definitions unset(my_definitions) - if (";${ARGN};" MATCHES ";nolock;")#Currently only used in TSStream - list(APPEND my_definitions "INPUT_NOLOCK") - endif() if (";${ARGN};" MATCHES ";tslive;") list(APPEND my_definitions "TSLIVE_INPUT") endif() @@ -320,7 +319,7 @@ makeInput(Buffer buffer) makeInput(ISMV ismv)#LTS makeInput(MP4 mp4)#LTS makeInput(TS ts)#LTS -makeInput(TSStream ts nolock tslive)#LTS +makeInput(TSStream ts tslive)#LTS makeInput(Folder folder folder)#LTS ######################################## diff --git a/flow_input b/flow_input index 3aa9e66f..bb41cc0b 100644 --- a/flow_input +++ b/flow_input @@ -1,7 +1,7 @@ - Construct input - Parse arguments -- Stream wordt gelocked IFF !nolock +- Stream wordt gelocked IFF conv.needsLock() - Start .run() - setup(): opent files/sockets/etc waar nodig - set "isStream" naar true diff --git a/lib/config.cpp b/lib/config.cpp index 75c7dda1..33316af0 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -264,6 +264,10 @@ bool Util::Config::parseArgs(int & argc, char ** & argv) { return true; } +bool Util::Config::hasOption(const std::string & optname) { + return vals.isMember(optname); +} + /// Returns a reference to the current value of an option or default if none was set. /// If the option does not exist, this exits the application with a return code of 37. JSON::Value & Util::Config::getOption(std::string optname, bool asArray) { diff --git a/lib/config.h b/lib/config.h index dcd3bad7..bdefe569 100644 --- a/lib/config.h +++ b/lib/config.h @@ -30,6 +30,7 @@ namespace Util { void addOption(std::string optname, JSON::Value option); void printHelp(std::ostream & output); bool parseArgs(int & argc, char ** & argv); + bool hasOption(const std::string & optname); JSON::Value & getOption(std::string optname, bool asArray = false); std::string getString(std::string optname); long long int getInteger(std::string optname); diff --git a/lib/dtsc.h b/lib/dtsc.h index fcf0152a..94f524c1 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -109,6 +109,7 @@ namespace DTSC { void operator = (const Packet & rhs); operator bool() const; packType getVersion() const; + void reInit(Socket::Connection & src); void reInit(const char * data_, unsigned int len, bool noCopy = false); void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, long long packBytePos, bool isKeyframe); void getString(const char * identifier, char *& result, unsigned int & len) const; @@ -354,8 +355,8 @@ namespace DTSC { void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000); LTS*/ void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000, const char * iVec = 0); - unsigned int getSendLen(bool skipDynamic = false); - void send(Socket::Connection & conn, bool skipDynamic = false); + unsigned int getSendLen(bool skipDynamic = false, std::set selectedTracks = std::set()); + void send(Socket::Connection & conn, bool skipDynamic = false, std::set selectedTracks = std::set()); void writeTo(char * p); JSON::Value toJSON(); void reset(); diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index c45ce903..64cf84ed 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -109,6 +109,32 @@ namespace DTSC { } } + void Packet::reInit(Socket::Connection & src) { + int sleepCount = 0; + null(); + int toReceive = 0; + while (src.connected()){ + if (!toReceive && src.Received().available(8)){ + if (src.Received().copy(2) != "DT"){ + INFO_MSG("Invalid DTSC Packet header encountered (%s)", src.Received().copy(4).c_str()); + break; + } + toReceive = Bit::btohl(src.Received().copy(8).data() + 4); + } + if (toReceive && src.Received().available(toReceive + 8)){ + std::string dataBuf = src.Received().remove(toReceive + 8); + reInit(dataBuf.data(), dataBuf.size()); + return; + } + if(!src.spool()){ + if (sleepCount++ > 5){ + return; + } + Util::sleep(500); + } + } + } + ///\brief Initializes a packet with new data ///\param data_ The new data for the packet ///\param len The length of the data pointed to by data_ @@ -1530,7 +1556,7 @@ namespace DTSC { } else if (type == "video") { result += 48; } - if (missedFrags) { + if (!skipDynamic && missedFrags) { result += 23; } return result; @@ -1709,10 +1735,12 @@ namespace DTSC { } ///\brief Determines the "packed" size of a meta object - unsigned int Meta::getSendLen(bool skipDynamic) { + unsigned int Meta::getSendLen(bool skipDynamic, std::set selectedTracks) { unsigned int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { - dataLen += it->second.getSendLen(skipDynamic); + if (!selectedTracks.size() || selectedTracks.count(it->first)){ + dataLen += it->second.getSendLen(skipDynamic); + } } return dataLen + 8; //add 8 bytes header } @@ -1749,13 +1777,15 @@ namespace DTSC { } ///\brief Writes a meta object to a socket - void Meta::send(Socket::Connection & conn, bool skipDynamic) { - int dataLen = getSendLen(skipDynamic) - 8; //strip 8 bytes header + void Meta::send(Socket::Connection & conn, bool skipDynamic, std::set selectedTracks) { + int dataLen = getSendLen(skipDynamic, selectedTracks) - 8; //strip 8 bytes header conn.SendNow(DTSC::Magic_Header, 4); conn.SendNow(convertInt(dataLen), 4); conn.SendNow("\340\000\006tracks\340", 10); for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { - it->second.send(conn, skipDynamic); + if (!selectedTracks.size() || selectedTracks.count(it->first)){ + it->second.send(conn, skipDynamic); + } } conn.SendNow("\000\000\356", 3);//End tracks object if (vod) { diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index f1618206..f149c834 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -1068,6 +1068,54 @@ namespace IPC { ///\brief The deconstructor sharedClient::~sharedClient() { mySemaphore.close(); + + + } + + bool sharedClient::isSingleEntry() { + semaphore tmpSem(baseName.c_str(), O_RDWR); + + if (!tmpSem) { + HIGH_MSG("Creating semaphore %s failed: %s, assuming we're alone", baseName.c_str(), strerror(errno)); + return true; + } + //Empty is used to compare for emptyness. This is not needed when the page uses a counter + char * empty = 0; + if (!hasCounter) { + empty = (char *)malloc(payLen * sizeof(char)); + if (!empty) { + HIGH_MSG("Failed to allocate %u bytes for empty payload, assuming we're not alone", payLen); + return false; + } + memset(empty, 0, payLen); + } + bool result = true; + { + semGuard tmpGuard(&tmpSem); + for (char i = 'A'; i <= 'Z'; i++) { + sharedPage tmpPage(baseName.substr(1) + i, (4096 << (i - 'A')), false, false); + if (!tmpPage.mapped) { + break; + } + int offset = 0; + while (offset + payLen + (hasCounter ? 1 : 0) <= tmpPage.len) { + //Skip our own entry + if (tmpPage.name == myPage.name && offset == offsetOnPage){ + offset += payLen + (hasCounter ? 1 : 0); + continue; + } + if (!((hasCounter && tmpPage.mapped[offset] == 0) || (!hasCounter && !memcmp(tmpPage.mapped + offset, empty, payLen)))) { + result = false; + break; + } + offset += payLen + (hasCounter ? 1 : 0); + } + } + } + if (empty) { + free(empty); + } + return result; } ///\brief Writes data to the shared data @@ -1113,6 +1161,16 @@ namespace IPC { } return (myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0)); } + + int sharedClient::getCounter() { + if (!hasCounter){ + return -1; + } + if (!myPage.mapped) { + return 0; + } + return *(myPage.mapped + offsetOnPage); + } userConnection::userConnection(char * _data) { data = _data; diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 9bac591e..3ee041bf 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -60,7 +60,7 @@ namespace IPC { class semaphore { public: semaphore(); - semaphore(const char * name, int oflag, mode_t mode, unsigned int value); + semaphore(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0); ~semaphore(); operator bool() const; void open(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0); @@ -220,6 +220,8 @@ namespace IPC { void finish(); void keepAlive(); char * getData(); + int getCounter(); + bool isSingleEntry(); private: ///\brief The basename of the shared pages. std::string baseName; diff --git a/lib/stream.cpp b/lib/stream.cpp index 39c91100..59300bd4 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -204,6 +204,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir //check in curConf for capabilities-inputs--priority/source_match std::string player_bin; + bool pullMode = false; bool selected = false; long long int curPrio = -1; DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs"); @@ -224,6 +225,20 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir curPrio = input.getMember("priority").asInt(); selected = true; } + + if (input.hasMember("stream_match")){ + source = input.getMember("stream_match").asString(); + front = source.substr(0,source.find('*')); + back = source.substr(source.find('*')+1); + DEBUG_MSG(DLVL_MEDIUM, "Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), input.getMember("name").asString().c_str(), source.c_str()); + + if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){ + player_bin = Util::getMyPath() + "MistIn" + input.getMember("name").asString(); + curPrio = input.getMember("priority").asInt(); + pullMode = true; + selected = true; + } + } } } @@ -261,9 +276,16 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir //finally, unlock the config semaphore configLock.post(); - DEBUG_MSG(DLVL_MEDIUM, "Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str()); + if (pullMode){ + DEBUG_MSG(DLVL_MEDIUM, "Starting %s -p -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str()); + }else{ + DEBUG_MSG(DLVL_MEDIUM, "Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str()); + } char * argv[30] = {(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()}; int argNum = 3; + if (pullMode){ + argv[++argNum] = (char*)"--pull"; + } std::string debugLvl; if (Util::Config::printDebugLevel != DEBUG && !str_args.count("--debug")){ debugLvl = JSON::Value((long long)Util::Config::printDebugLevel).asString(); diff --git a/lib/util.cpp b/lib/util.cpp new file mode 100644 index 00000000..2c44cf47 --- /dev/null +++ b/lib/util.cpp @@ -0,0 +1,40 @@ +#include "util.h" +#include + +namespace Util { + bool stringScan(const std::string & src, const std::string & pattern, std::deque & result){ + result.clear(); + std::deque positions; + size_t pos = pattern.find("%", 0); + while (pos != std::string::npos){ + positions.push_back(pos); + pos = pattern.find("%", pos + 1); + } + if (positions.size() == 0){ + return false; + } + size_t sourcePos = 0; + size_t patternPos = 0; + std::deque::iterator posIter = positions.begin(); + while (sourcePos != std::string::npos){ + //Match first part of the string + if (pattern.substr(patternPos, *posIter - patternPos) != src.substr(sourcePos, *posIter - patternPos)){ + break; + } + sourcePos += *posIter - patternPos; + std::deque::iterator nxtIter = posIter + 1; + if (nxtIter != positions.end()){ + patternPos = *posIter+2; + size_t tmpPos = src.find(pattern.substr(*posIter+2, *nxtIter - patternPos), sourcePos); + result.push_back(src.substr(sourcePos, tmpPos - sourcePos)); + sourcePos = tmpPos; + }else{ + result.push_back(src.substr(sourcePos)); + sourcePos = std::string::npos; + } + posIter++; + } + return result.size() == positions.size(); + } +} + diff --git a/lib/util.h b/lib/util.h new file mode 100644 index 00000000..df0a27b9 --- /dev/null +++ b/lib/util.h @@ -0,0 +1,6 @@ +#include +#include + +namespace Util { + bool stringScan(const std::string & src, const std::string & pattern, std::deque & result); +} diff --git a/src/input/input.cpp b/src/input/input.cpp index 7536c185..7e085f3d 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -83,6 +83,7 @@ namespace Mist { singleton = this; isBuffer = false; + streamMode = false; } void Input::checkHeaderTimes(std::string streamFile) { @@ -115,16 +116,27 @@ namespace Mist { } } + bool Input::needsLock() { + return !(config->hasOption("pull") && config->getBool("pull")); + } + int Input::run() { + if (config->getBool("json")) { + std::cout << capa.toString() << std::endl; + return 0; + } + if (streamName != "") { config->getOption("streamname") = streamName; } streamName = config->getString("streamname"); nProxy.streamName = streamName; - if (config->getBool("json")) { - std::cout << capa.toString() << std::endl; - return 0; - } + + + streamMode = config->hasOption("pull") && config->getBool("pull"); + INFO_MSG("Stream %s in %s mode", streamName.c_str(), streamMode ? "stream" : "non-stream"); + + if (!setup()) { std::cerr << config->getString("cmd") << " setup failed." << std::endl; return 0; @@ -139,7 +151,9 @@ namespace Mist { if (!streamName.size()) { convert(); - } else { + } else if (streamMode) { + stream(); + }else{ serve(); } return 0; @@ -243,53 +257,92 @@ namespace Mist { /// Main loop for stream-style inputs. /// This loop will start the buffer without resume support, and then repeatedly call ..... followed by .... void Input::stream(){ + IPC::semaphore pullLock; + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!pullLock.tryWait()){ + DEBUG_MSG(DLVL_DEVEL, "A pull process for stream %s is already running", streamName.c_str()); + return; + } + if (Util::streamAlive(streamName)){ + pullLock.post(); + pullLock.close(); + return; + } + if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer + pullLock.post(); + pullLock.close(); + return; + } + char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - /*LTS-START*/ - if(Triggers::shouldTrigger("STREAM_READY", config->getString("streamname"))){ - std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n"; - if (!Triggers::doTrigger("STREAM_READY", payload, config->getString("streamname"))){ - config->is_active = false; - } - } - /*LTS-END*/ - userPage.init(userPageName, PLAY_EX_SIZE, true); - if (!isBuffer) { - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { - bufferFrame(it->first, 1); - } - } + nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); - long long int activityCounter = Util::bootSecs(); - while ((Util::bootSecs() - activityCounter) < 10 && config->is_active) { //10 second timeout - userPage.parseEach(callbackWrapper); - removeUnused(); - if (userPage.amount) { - activityCounter = Util::bootSecs(); - DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount); - } else { - DEBUG_MSG(DLVL_INSANE, "Timer running"); - } - /*LTS-START*/ - if ((Util::bootSecs() - activityCounter) >= 10 || !config->is_active){//10 second timeout - if(Triggers::shouldTrigger("STREAM_UNLOAD", config->getString("streamname"))){ - std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n"; - if (!Triggers::doTrigger("STREAM_UNLOAD", payload, config->getString("streamname"))){ - activityCounter = Util::bootSecs(); - config->is_active = true; + if (!openStreamSource()){ + FAIL_MSG("Unable to connect to source"); + pullLock.post(); + pullLock.close(); + return; + } + parseStreamHeader(); + + if (myMeta.tracks.size() == 0){ + nProxy.userClient.finish(); + finish(); + pullLock.post(); + pullLock.close(); + return; + } + + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + it->second.firstms = 0; + it->second.lastms = 0; + } + + getNext(); + unsigned long long lastTime = Util::getMS(); + unsigned long long lastActive = Util::getMS(); + while (thisPacket && config->is_active){ + nProxy.bufferLivePacket(thisPacket, myMeta); + getNext(); + nProxy.userClient.keepAlive(); + if (Util::getMS() - lastTime >= 1000){ + lastTime = Util::getMS(); + if (nProxy.userClient.isSingleEntry()){ + if (lastTime - lastActive >= 10000){//10sec timeout + config->is_active = false; } + }else{ + lastActive = lastTime; } } - /*LTS-END*/ - if (config->is_active){ - Util::sleep(1000); + } + + closeStreamSource(); + + while (config->is_active){ + Util::sleep(500); + nProxy.userClient.keepAlive(); + if (Util::getMS() - lastTime >= 1000){ + lastTime = Util::getMS(); + if (nProxy.userClient.isSingleEntry()){ + if (lastTime - lastActive >= 10000){//10sec timeout + config->is_active = false; + } + }else{ + lastActive = lastTime; + } } } + + + nProxy.userClient.finish(); finish(); - DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str()); - //end player functionality + pullLock.post(); + pullLock.close(); + return; } void Input::finish() { diff --git a/src/input/input.h b/src/input/input.h index b48adbbc..b3940721 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -23,6 +23,8 @@ namespace Mist { virtual void onCrash(){} virtual void argumentsParsed(){} virtual ~Input() {}; + + virtual bool needsLock(); protected: static void callbackWrapper(char * data, size_t len, unsigned int id); virtual bool setup() = 0; @@ -31,6 +33,9 @@ namespace Mist { virtual void getNext(bool smart = true) {}; virtual void seek(int seekTime){}; virtual void finish(); + virtual bool openStreamSource() { return false; }; + virtual void closeStreamSource() {}; + virtual void parseStreamHeader() {}; void play(int until = 0); void playOnce(); void quitPlay(); @@ -40,6 +45,9 @@ namespace Mist { virtual void userCallback(char * data, size_t len, unsigned int id); virtual void convert(); virtual void serve(); + virtual void stream(); + bool streamMode; + virtual void parseHeader(); bool bufferFrame(unsigned int track, unsigned int keyNum); diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index c550e7a3..329cbe90 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -23,7 +23,7 @@ /*LTS-END*/ namespace Mist { - inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg){ + inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg) { capa["name"] = "Buffer"; JSON::Value option; option["arg"] = "integer"; @@ -94,7 +94,7 @@ namespace Mist { capa["optional"]["segmentsize"]["type"] = "uint"; capa["optional"]["segmentsize"]["default"] = 5000LL; option.null(); - + option["arg"] = "string"; option["long"] = "udp-port"; option["short"] = "U"; @@ -139,39 +139,39 @@ namespace Mist { resumeMode = false; } - inputBuffer::~inputBuffer(){ + inputBuffer::~inputBuffer() { config->is_active = false; - if (myMeta.tracks.size()){ + if (myMeta.tracks.size()) { /*LTS-START*/ - if (myMeta.bufferWindow){ - if(Triggers::shouldTrigger("STREAM_BUFFER")){ - std::string payload = config->getString("streamname")+"\nEMPTY"; + if (myMeta.bufferWindow) { + if (Triggers::shouldTrigger("STREAM_BUFFER")) { + std::string payload = config->getString("streamname") + "\nEMPTY"; Triggers::doTrigger("STREAM_BUFFER", payload, config->getString("streamname")); } } /*LTS-END*/ DEBUG_MSG(DLVL_DEVEL, "Cleaning up, removing last keyframes"); - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { std::map & locations = bufferLocations[it->first]; - if (!nProxy.metaPages.count(it->first) || !nProxy.metaPages[it->first].mapped){ + if (!nProxy.metaPages.count(it->first) || !nProxy.metaPages[it->first].mapped) { continue; } //First detect all entries on metaPage - for (int i = 0; i < 8192; i += 8){ + for (int i = 0; i < 8192; i += 8) { int * tmpOffset = (int *)(nProxy.metaPages[it->first].mapped + i); - if (tmpOffset[0] == 0 && tmpOffset[1] == 0){ + if (tmpOffset[0] == 0 && tmpOffset[1] == 0) { continue; } unsigned long keyNum = ntohl(tmpOffset[0]); //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. - if (!locations.count(keyNum)){ + if (!locations.count(keyNum)) { locations[keyNum].curOffset = 0; } locations[keyNum].pageNum = keyNum; locations[keyNum].keyNum = ntohl(tmpOffset[1]); } - for (std::map::iterator it2 = locations.begin(); it2 != locations.end(); it2++){ + for (std::map::iterator it2 = locations.begin(); it2 != locations.end(); it2++) { char thisPageName[NAME_BUFFER_SIZE]; snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), it->first, it2->first); IPC::sharedPage erasePage(thisPageName, 20971520); @@ -244,48 +244,50 @@ namespace Mist { } } - /// \triggers + /// \triggers /// The `"STREAM_BUFFER"` trigger is stream-specific, and is ran whenever the buffer changes state between playable (FULL) or not (EMPTY). It cannot be cancelled. It is possible to receive multiple EMPTY calls without FULL calls in between, as EMPTY is always generated when a stream is unloaded from memory, even if this stream never reached playable state in the first place (e.g. a broadcast was cancelled before filling enough buffer to be playable). Its payload is: /// ~~~~~~~~~~~~~~~ /// streamname /// FULL or EMPTY (depending on current state) /// ~~~~~~~~~~~~~~~ - void inputBuffer::updateMeta(){ + void inputBuffer::updateMeta() { static long long unsigned int lastFragCount = 0xFFFFull; long long unsigned int firstms = 0xFFFFFFFFFFFFFFFFull; long long unsigned int lastms = 0; long long unsigned int fragCount = 0xFFFFull; - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (it->second.type == "meta" || !it->second.type.size()){continue;} - if (it->second.init.size()){ - if (!initData.count(it->first) || initData[it->first] != it->second.init){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { + if (it->second.type == "meta" || !it->second.type.size()) { + continue; + } + if (it->second.init.size()) { + if (!initData.count(it->first) || initData[it->first] != it->second.init) { initData[it->first] = it->second.init; } - }else{ - if (initData.count(it->first)){ + } else { + if (initData.count(it->first)) { it->second.init = initData[it->first]; } } - if (it->second.fragments.size() < fragCount){ + if (it->second.fragments.size() < fragCount) { fragCount = it->second.fragments.size(); } - if (it->second.firstms < firstms){ + if (it->second.firstms < firstms) { firstms = it->second.firstms; } - if (it->second.lastms > lastms){ + if (it->second.lastms > lastms) { lastms = it->second.lastms; } } /*LTS-START*/ - if (fragCount >= FRAG_BOOT && fragCount != 0xFFFFull && (lastFragCount == 0xFFFFull || lastFragCount < FRAG_BOOT)){ - if(Triggers::shouldTrigger("STREAM_BUFFER")){ - std::string payload = config->getString("streamname")+"\nFULL"; + if (fragCount >= FRAG_BOOT && fragCount != 0xFFFFull && (lastFragCount == 0xFFFFull || lastFragCount < FRAG_BOOT)) { + if (Triggers::shouldTrigger("STREAM_BUFFER")) { + std::string payload = config->getString("streamname") + "\nFULL"; Triggers::doTrigger("STREAM_BUFFER", payload, config->getString("streamname")); } } - if ((fragCount < FRAG_BOOT || fragCount == 0xFFFFull) && (lastFragCount >= FRAG_BOOT && lastFragCount != 0xFFFFull)){ - if(Triggers::shouldTrigger("STREAM_BUFFER")){ - std::string payload = config->getString("streamname")+"\nEMPTY"; + if ((fragCount < FRAG_BOOT || fragCount == 0xFFFFull) && (lastFragCount >= FRAG_BOOT && lastFragCount != 0xFFFFull)) { + if (Triggers::shouldTrigger("STREAM_BUFFER")) { + std::string payload = config->getString("streamname") + "\nEMPTY"; Triggers::doTrigger("STREAM_BUFFER", payload, config->getString("streamname")); } } @@ -298,7 +300,7 @@ namespace Mist { snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); IPC::semaphore liveMeta(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1); liveMeta.wait(); - if (!nProxy.metaPages.count(0) || !nProxy.metaPages[0].mapped){ + if (!nProxy.metaPages.count(0) || !nProxy.metaPages[0].mapped) { char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); nProxy.metaPages[0].init(pageName, DEFAULT_META_PAGE_SIZE, true); @@ -309,19 +311,19 @@ namespace Mist { liveMeta.post(); } - bool inputBuffer::removeKey(unsigned int tid){ - if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active){ + bool inputBuffer::removeKey(unsigned int tid) { + if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active) { return false; } - if (!myMeta.tracks[tid].keys.size()){ + if (!myMeta.tracks[tid].keys.size()) { return false; } DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber()); //remove all parts of this key - for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++){ + for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) { /*LTS-START*/ - if (recFile.is_open()){ - if (!recMeta.tracks.count(tid)){ + if (recFile.is_open()) { + if (!recMeta.tracks.count(tid)) { recMeta.tracks[tid] = myMeta.tracks[tid]; recMeta.tracks[tid].reset(); } @@ -335,12 +337,12 @@ namespace Mist { //re-calculate firstms myMeta.tracks[tid].firstms = myMeta.tracks[tid].keys[0].getTime(); //delete the fragment if it's no longer fully buffered - if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()){ + if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()) { myMeta.tracks[tid].fragments.pop_front(); myMeta.tracks[tid].missedFrags ++; } //if there is more than one page buffered for this track... - if (bufferLocations[tid].size() > 1){ + if (bufferLocations[tid].size() > 1) { //Check if the first key starts on the second page or higher if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){ DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1); @@ -361,11 +363,11 @@ namespace Mist { return true; } - void inputBuffer::eraseTrackDataPages(unsigned long tid){ - if (!bufferLocations.count(tid)){ + void inputBuffer::eraseTrackDataPages(unsigned long tid) { + if (!bufferLocations.count(tid)) { return; } - for (std::map::iterator it = bufferLocations[tid].begin(); it != bufferLocations[tid].end(); it++){ + for (std::map::iterator it = bufferLocations[tid].begin(); it != bufferLocations[tid].end(); it++) { char thisPageName[NAME_BUFFER_SIZE]; snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tid, it->first); nProxy.curPage[tid].init(thisPageName, 20971520, false, false); @@ -380,68 +382,68 @@ namespace Mist { void inputBuffer::finish() { Input::finish(); updateMeta(); - if (bufferLocations.size()){ + if (bufferLocations.size()) { std::set toErase; - for (std::map >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++){ + for (std::map >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++) { toErase.insert(it->first); } - for (std::set::iterator it = toErase.begin(); it != toErase.end(); ++it){ + for (std::set::iterator it = toErase.begin(); it != toErase.end(); ++it) { eraseTrackDataPages(*it); } } } - /// \triggers + /// \triggers /// The `"STREAM_TRACK_REMOVE"` trigger is stream-specific, and is ran whenever a track is fully removed from a live strean buffer. It cannot be cancelled. Its payload is: /// ~~~~~~~~~~~~~~~ /// streamname /// trackID /// ~~~~~~~~~~~~~~~ - void inputBuffer::removeUnused(){ + void inputBuffer::removeUnused() { //first remove all tracks that have not been updated for too long bool changed = true; - while (changed){ + while (changed) { changed = false; long long unsigned int time = Util::bootSecs(); long long unsigned int compareFirst = 0xFFFFFFFFFFFFFFFFull; long long unsigned int compareLast = 0; //for tracks that were updated in the last 5 seconds, get the first and last ms edges. - for (std::map::iterator it2 = myMeta.tracks.begin(); it2 != myMeta.tracks.end(); it2++){ - if ((time - lastUpdated[it2->first]) > 5){ + for (std::map::iterator it2 = myMeta.tracks.begin(); it2 != myMeta.tracks.end(); it2++) { + if ((time - lastUpdated[it2->first]) > 5) { continue; } - if (it2->second.lastms > compareLast){ + if (it2->second.lastms > compareLast) { compareLast = it2->second.lastms; } - if (it2->second.firstms < compareFirst){ + if (it2->second.firstms < compareFirst) { compareFirst = it2->second.firstms; } } - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { //if not updated for an entire buffer duration, or last updated track and this track differ by an entire buffer duration, erase the track. if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000) || - (compareLast && (long long int)(time - lastUpdated[it->first]) > 5 && ( - (compareLast < it->second.firstms && (long long int)(it->second.firstms - compareLast) > bufferTime) - || - (compareFirst > it->second.lastms && (long long int)(compareFirst - it->second.lastms) > bufferTime) - )) - ){ + (compareLast && (long long int)(time - lastUpdated[it->first]) > 5 && ( + (compareLast < it->second.firstms && (long long int)(it->second.firstms - compareLast) > bufferTime) + || + (compareFirst > it->second.lastms && (long long int)(compareFirst - it->second.lastms) > bufferTime) + )) + ) { unsigned int tid = it->first; //erase this track - if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000)){ + if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000)) { INFO_MSG("Erasing track %d because not updated for %ds (> %ds)", it->first, (long long int)(time - lastUpdated[it->first]), (long long int)(bufferTime / 1000)); - }else{ - INFO_MSG("Erasing inactive track %u because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", it->first, it->second.firstms / 1000, it->second.lastms / 1000, compareFirst/1000, compareLast/1000, bufferTime / 1000); + } else { + INFO_MSG("Erasing inactive track %u because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", it->first, it->second.firstms / 1000, it->second.lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000); } /*LTS-START*/ - if(Triggers::shouldTrigger("STREAM_TRACK_REMOVE")){ - std::string payload = config->getString("streamname")+"\n"+JSON::Value((long long)it->first).asString()+"\n"; + if (Triggers::shouldTrigger("STREAM_TRACK_REMOVE")) { + std::string payload = config->getString("streamname") + "\n" + JSON::Value((long long)it->first).asString() + "\n"; Triggers::doTrigger("STREAM_TRACK_REMOVE", payload, config->getString("streamname")); - } + } /*LTS-END*/ lastUpdated.erase(tid); /// \todo Consider replacing with eraseTrackDataPages(it->first)? - while (bufferLocations[tid].size()){ + while (bufferLocations[tid].size()) { char thisPageName[NAME_BUFFER_SIZE]; snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), (unsigned long)tid, bufferLocations[tid].begin()->first); nProxy.curPage[tid].init(thisPageName, 20971520); @@ -477,14 +479,14 @@ namespace Mist { } //find the earliest video keyframe stored unsigned int firstVideo = 1; - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (it->second.type == "video"){ - if (it->second.firstms < firstVideo || firstVideo == 1){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { + if (it->second.type == "video") { + if (it->second.firstms < firstVideo || firstVideo == 1) { firstVideo = it->second.firstms; } } } - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { //non-video tracks need to have a second keyframe that is <= firstVideo //firstVideo = 1 happens when there are no tracks, in which case we don't care any more if (it->second.type != "video"){ @@ -493,39 +495,39 @@ namespace Mist { } } //Buffer cutting - while (it->second.keys.size() > 1 && it->second.keys[0].getTime() < cutTime){ - if (!removeKey(it->first)){ + while (it->second.keys.size() > 1 && it->second.keys[0].getTime() < cutTime) { + if (!removeKey(it->first)) { break; } } //Buffer size management - while (it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime){ - if (!removeKey(it->first)){ + while (it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime) { + if (!removeKey(it->first)) { break; } } } updateMeta(); static bool everHadPush = false; - if (hasPush){ + if (hasPush) { hasPush = false; everHadPush = true; - }else if(everHadPush && !resumeMode && config->is_active){ + } else if (everHadPush && !resumeMode && config->is_active) { INFO_MSG("Shutting down buffer because resume mode is disabled and the source disconnected"); config->is_active = false; } } - /// \triggers + /// \triggers /// The `"STREAM_TRACK_ADD"` trigger is stream-specific, and is ran whenever a new track is added to a live strean buffer. It cannot be cancelled. Its payload is: /// ~~~~~~~~~~~~~~~ /// streamname /// trackID /// ~~~~~~~~~~~~~~~ - void inputBuffer::userCallback(char * data, size_t len, unsigned int id){ + void inputBuffer::userCallback(char * data, size_t len, unsigned int id) { /*LTS-START*/ //Reload the configuration to make sure we stay up to date with changes through the api - if (Util::epoch() - lastReTime > 4){ + if (Util::epoch() - lastReTime > 4) { setup(); } /*LTS-END*/ @@ -535,27 +537,27 @@ namespace Mist { char counter = (*(data - 1)); //Each user can have at maximum SIMUL_TRACKS elements in their userpage. IPC::userConnection userConn(data); - for (int index = 0; index < SIMUL_TRACKS; index++){ + for (int index = 0; index < SIMUL_TRACKS; index++) { //Get the track id from the current element unsigned long value = userConn.getTrackId(index); //Skip value 0xFFFFFFFF as this indicates a previously declined track - if (value == 0xFFFFFFFF){ + if (value == 0xFFFFFFFF) { continue; } //Skip value 0 as this indicates an empty track - if (value == 0){ + if (value == 0) { continue; } //If the current value indicates a valid trackid, and it is pushed from this user - if (pushLocation[value] == data){ + if (pushLocation[value] == data) { //Check for timeouts, and erase the track if necessary - if (counter == 126 || counter == 127 || counter == 254 || counter == 255){ + if (counter == 126 || counter == 127 || counter == 254 || counter == 255) { pushLocation.erase(value); - if (negotiatingTracks.count(value)){ + if (negotiatingTracks.count(value)) { negotiatingTracks.erase(value); } - if (activeTracks.count(value)){ + if (activeTracks.count(value)) { updateMeta(); eraseTrackDataPages(value); activeTracks.erase(value); @@ -568,7 +570,67 @@ namespace Mist { } //Track is set to "New track request", assign new track id and create shared memory page //This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process - if (value & 0x80000000){ + if (value & 0x80000000) { + if (value & 0x40000000) { + unsigned long finalMap = value & ~0xC0000000; + //Register the new track as an active track. + activeTracks.insert(finalMap); + //Register the time of registration as initial value for the lastUpdated field, plus an extra 5 seconds just to be sure. + lastUpdated[finalMap] = Util::bootSecs() + 5; + //Register the user thats is pushing this element + pushLocation[finalMap] = data; + //Initialize the metadata for this track + if (!myMeta.tracks.count(finalMap)) { + DEBUG_MSG(DLVL_MEDIUM, "Inserting metadata for track number %d", finalMap); + + IPC::sharedPage tMeta; + + char tempMetaName[NAME_BUFFER_SIZE]; + snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), finalMap); + tMeta.init(tempMetaName, 8388608, false); + + //The page exist, now we try to read in the metadata of the track + + //Store the size of the dtsc packet to read. + unsigned int len = ntohl(((int *)tMeta.mapped)[1]); + //Temporary variable, won't be used again + unsigned int tempForReadingMeta = 0; + //Read in the metadata through a temporary JSON object + ///\todo Optimize this part. Find a way to not have to store the metadata in JSON first, but read it from the page immediately + JSON::Value tempJSONForMeta; + JSON::fromDTMI((const unsigned char *)tMeta.mapped + 8, len, tempForReadingMeta, tempJSONForMeta); + + tMeta.master = true; + + //Construct a metadata object for the current track + DTSC::Meta trackMeta(tempJSONForMeta); + + myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second; + myMeta.tracks[finalMap].firstms = 0; + myMeta.tracks[finalMap].lastms = 0; + + userConn.setTrackId(index, finalMap); + userConn.setKeynum(index, 0x0000); + + + char firstPage[NAME_BUFFER_SIZE]; + snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); + nProxy.metaPages[finalMap].init(firstPage, 8192, false); + INFO_MSG("Meh %d", finalMap); + + //Update the metadata for this track + updateTrackMeta(finalMap); + INFO_MSG("Setting hasPush to true, quickNegotiate"); + hasPush = true; + } + //Write the final mapped track number and keyframe number to the user page element + //This is used to resume pushing as well as pushing new tracks + userConn.setTrackId(index, finalMap); + userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size()); + //Update the metadata to reflect all changes + updateMeta(); + continue; + } //Set the temporary track id for this item, and increase the temporary value for use with the next track unsigned long long tempMapping = nextTempId++; //Add the temporary track id to the list of tracks that are currently being negotiated @@ -583,9 +645,9 @@ namespace Mist { } //The track id is set to the value of a track that we are currently negotiating about - if (negotiatingTracks.count(value)){ + if (negotiatingTracks.count(value)) { //If the metadata page for this track is not yet registered, initialize it - if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped){ + if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) { char tempMetaName[NAME_BUFFER_SIZE]; snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), value); nProxy.metaPages[value].init(tempMetaName, 8388608, false, false); @@ -593,7 +655,7 @@ namespace Mist { //If this tracks metdata page is not initialize, skip the entire element for now. It will be instantiated later if (!nProxy.metaPages[value].mapped) { //remove the negotiation if it has timed out - if (++negotiationTimeout[value] >= 1000){ + if (++negotiationTimeout[value] >= 1000) { negotiatingTracks.erase(value); negotiationTimeout.erase(value); } @@ -615,7 +677,7 @@ namespace Mist { //If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call. if (!trackMeta.tracks.count(value)) { //remove the negotiation if it has timed out - if (++negotiationTimeout[value] >= 1000){ + if (++negotiationTimeout[value] >= 1000) { negotiatingTracks.erase(value); //Set master to true before erasing the page, because we are responsible for cleaning up unused pages nProxy.metaPages[value].master = true; @@ -630,10 +692,10 @@ namespace Mist { /*LTS-START*/ //Get the identifier for the track, and attempt colission detection. int collidesWith = -1; - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { //If the identifier of an existing track and the current track match, assume the are the same track and reject the negotiated one. ///\todo Maybe switch to a new form of detecting collisions, especially with regards to multiple audio languages and camera angles. - if (it->second.getIdentifier() == trackIdentifier){ + if (it->second.getIdentifier() == trackIdentifier) { collidesWith = it->first; break; } @@ -646,21 +708,21 @@ namespace Mist { nProxy.metaPages.erase(value); //Check if the track collides, and whether the track it collides with is active. - if (collidesWith != -1 && activeTracks.count(collidesWith)){/*LTS*/ + if (collidesWith != -1 && activeTracks.count(collidesWith)) { /*LTS*/ //Print a warning message and set the state of the track to rejected. WARN_MSG("Collision of temporary track %lu with existing track %d detected. Handling as a new valid track.", value, collidesWith); collidesWith = -1; } /*LTS-START*/ unsigned long finalMap = collidesWith; - if (finalMap == -1){ + if (finalMap == -1) { //No collision has been detected, assign a new final number finalMap = (myMeta.tracks.size() ? myMeta.tracks.rbegin()->first : 0) + 1; DEBUG_MSG(DLVL_DEVEL, "No colision detected for temporary track %lu from user %u, assigning final track number %lu", value, id, finalMap); - if(Triggers::shouldTrigger("STREAM_TRACK_ADD")){ - std::string payload = config->getString("streamname")+"\n"+JSON::Value((long long)finalMap).asString()+"\n"; + if (Triggers::shouldTrigger("STREAM_TRACK_ADD")) { + std::string payload = config->getString("streamname") + "\n" + JSON::Value((long long)finalMap).asString() + "\n"; Triggers::doTrigger("STREAM_TRACK_ADD", payload, config->getString("streamname")); - } + } } /*LTS-END*/ //Resume either if we have more than 1 keyframe on the replacement track (assume it was already pushing before the track "dissapeared") @@ -690,7 +752,7 @@ namespace Mist { //Register the user thats is pushing this element pushLocation[finalMap] = data; //Initialize the metadata for this track if it was not in place yet. - if (!myMeta.tracks.count(finalMap)){ + if (!myMeta.tracks.count(finalMap)) { DEBUG_MSG(DLVL_MEDIUM, "Inserting metadata for track number %d", finalMap); myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second; myMeta.tracks[finalMap].trackID = finalMap; @@ -703,14 +765,14 @@ namespace Mist { updateMeta(); } //If the track is active, and this is the element responsible for pushing it - if (activeTracks.count(value) && pushLocation[value] == data){ + if (activeTracks.count(value) && pushLocation[value] == data) { //Open the track index page if we dont have it open yet - if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped){ + if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) { char firstPage[NAME_BUFFER_SIZE]; snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), value); nProxy.metaPages[value].init(firstPage, 8192, false, false); } - if (nProxy.metaPages[value].mapped){ + if (nProxy.metaPages[value].mapped) { //Update the metadata for this track updateTrackMeta(value); hasPush = true; @@ -719,7 +781,7 @@ namespace Mist { } } - void inputBuffer::updateTrackMeta(unsigned long tNum){ + void inputBuffer::updateTrackMeta(unsigned long tNum) { VERYHIGH_MSG("Updating meta for track %d", tNum); //Store a reference for easier access std::map & locations = bufferLocations[tNum]; @@ -728,34 +790,34 @@ namespace Mist { //First detect all entries on metaPage for (int i = 0; i < 8192; i += 8) { int * tmpOffset = (int *)(mappedPointer + i); - if (tmpOffset[0] == 0 && tmpOffset[1] == 0){ + if (tmpOffset[0] == 0 && tmpOffset[1] == 0) { continue; } unsigned long keyNum = ntohl(tmpOffset[0]); INSANE_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1])); //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. - if (!locations.count(keyNum)){ + if (!locations.count(keyNum)) { locations[keyNum].curOffset = 0; } locations[keyNum].pageNum = keyNum; locations[keyNum].keyNum = ntohl(tmpOffset[1]); } //Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest - for (std::map::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++){ + for (std::map::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) { updateMetaFromPage(tNum, pageIt->first); } updateMeta(); } - void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum){ + void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum) { VERYHIGH_MSG("Updating meta for track %d page %d", tNum, pageNum); DTSCPageData & pageData = bufferLocations[tNum][pageNum]; //If the current page is over its 8mb "splitting" boundary - if (pageData.curOffset > (8 * 1024 * 1024)){ + if (pageData.curOffset > (8 * 1024 * 1024)) { //And the last keyframe in the parsed metadata is further in the stream than this page - if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()){ + if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()) { //Assume the entire page is already parsed return; } @@ -764,14 +826,14 @@ namespace Mist { //Otherwise open and parse the page //Open the page if it is not yet open - if (!nProxy.curPageNum.count(tNum) || nProxy.curPageNum[tNum] != pageNum || !nProxy.curPage[tNum].mapped){ + if (!nProxy.curPageNum.count(tNum) || nProxy.curPageNum[tNum] != pageNum || !nProxy.curPage[tNum].mapped) { //DO NOT ERASE THE PAGE HERE, master is not set to true nProxy.curPageNum.erase(tNum); char nextPageName[NAME_BUFFER_SIZE]; snprintf(nextPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tNum, pageNum); nProxy.curPage[tNum].init(nextPageName, 20971520); //If the page can not be opened, stop here - if (!nProxy.curPage[tNum].mapped){ + if (!nProxy.curPage[tNum].mapped) { WARN_MSG("Could not open page: %s", nextPageName); return; } @@ -780,21 +842,21 @@ namespace Mist { DTSC::Packet tmpPack; - if (!nProxy.curPage[tNum].mapped[pageData.curOffset]){ + if (!nProxy.curPage[tNum].mapped[pageData.curOffset]) { VERYHIGH_MSG("No packet on page %lu for track %lu, waiting...", pageNum, tNum); return; } tmpPack.reInit(nProxy.curPage[tNum].mapped + pageData.curOffset, 0); //No new data has been written on the page since last update - if (!tmpPack){ + if (!tmpPack) { return; } lastUpdated[tNum] = Util::bootSecs(); - while (tmpPack){ + while (tmpPack) { //Update the metadata with this packet myMeta.update(tmpPack, segmentSize);/*LTS*/ //Set the first time when appropriate - if (pageData.firstTime == 0){ + if (pageData.firstTime == 0) { pageData.firstTime = tmpPack.getTime(); } //Update the offset on the page with the size of the current packet @@ -804,7 +866,7 @@ namespace Mist { } } - bool inputBuffer::setup(){ + bool inputBuffer::setup() { lastReTime = Util::epoch(); /*LTS*/ std::string strName = config->getString("streamname"); Util::sanitizeName(strName); @@ -816,10 +878,10 @@ namespace Mist { long long tmpNum; //if stream is configured and setting is present, use it, always - if (streamCfg && streamCfg.getMember("DVR")){ + if (streamCfg && streamCfg.getMember("DVR")) { tmpNum = streamCfg.getMember("DVR").asInt(); } else { - if (streamCfg){ + if (streamCfg) { //otherwise, if stream is configured use the default tmpNum = config->getOption("bufferTime", true)[0u].asInt(); } else { @@ -827,19 +889,21 @@ namespace Mist { tmpNum = config->getOption("bufferTime").asInt(); } } - if (tmpNum < 1000){tmpNum = 1000;} + if (tmpNum < 1000) { + tmpNum = 1000; + } //if the new value is different, print a message and apply it - if (bufferTime != tmpNum){ + if (bufferTime != tmpNum) { DEBUG_MSG(DLVL_DEVEL, "Setting bufferTime from %u to new value of %lli", bufferTime, tmpNum); bufferTime = tmpNum; } /*LTS-START*/ //if stream is configured and setting is present, use it, always - if (streamCfg && streamCfg.getMember("cut")){ + if (streamCfg && streamCfg.getMember("cut")) { tmpNum = streamCfg.getMember("cut").asInt(); } else { - if (streamCfg){ + if (streamCfg) { //otherwise, if stream is configured use the default tmpNum = config->getOption("cut", true)[0u].asInt(); } else { @@ -848,16 +912,16 @@ namespace Mist { } } //if the new value is different, print a message and apply it - if (cutTime != tmpNum){ + if (cutTime != tmpNum) { DEBUG_MSG(DLVL_DEVEL, "Setting cutTime from %u to new value of %lli", cutTime, tmpNum); cutTime = tmpNum; } //if stream is configured and setting is present, use it, always - if (streamCfg && streamCfg.getMember("resume")){ + if (streamCfg && streamCfg.getMember("resume")) { tmpNum = streamCfg.getMember("resume").asInt(); } else { - if (streamCfg){ + if (streamCfg) { //otherwise, if stream is configured use the default tmpNum = config->getOption("resume", true)[0u].asInt(); } else { @@ -866,16 +930,16 @@ namespace Mist { } } //if the new value is different, print a message and apply it - if (resumeMode != (bool)tmpNum){ - DEBUG_MSG(DLVL_DEVEL, "Setting resume mode from %s to new value of %s", resumeMode?"enabled":"disabled", tmpNum?"enabled":"disabled"); + if (resumeMode != (bool)tmpNum) { + DEBUG_MSG(DLVL_DEVEL, "Setting resume mode from %s to new value of %s", resumeMode ? "enabled" : "disabled", tmpNum ? "enabled" : "disabled"); resumeMode = tmpNum; } //if stream is configured and setting is present, use it, always - if (streamCfg && streamCfg.getMember("segmentsize")){ + if (streamCfg && streamCfg.getMember("segmentsize")) { tmpNum = streamCfg.getMember("segmentsize").asInt(); } else { - if (streamCfg){ + if (streamCfg) { //otherwise, if stream is configured use the default tmpNum = config->getOption("segmentsize", true)[0u].asInt(); } else { @@ -884,7 +948,7 @@ namespace Mist { } } //if the new value is different, print a message and apply it - if (segmentSize != tmpNum){ + if (segmentSize != tmpNum) { DEBUG_MSG(DLVL_DEVEL, "Setting segmentSize from %u to new value of %lli", segmentSize, tmpNum); segmentSize = tmpNum; } @@ -929,8 +993,8 @@ namespace Mist { bool has_keyframes = false; std::map::iterator it = myMeta.tracks.begin(); while (it != myMeta.tracks.end()) { - - DTSC::Track& tr = it->second; + + DTSC::Track & tr = it->second; if (tr.type != "video") { ++it; continue; @@ -943,33 +1007,23 @@ namespace Mist { ++it; } - if (streamCfg - && streamCfg.getMember("record") - && streamCfg.getMember("record").asString().size() > 0 - && has_keyframes - ) - { + if (streamCfg && streamCfg.getMember("record") && streamCfg.getMember("record").asString().size() > 0 && has_keyframes) { - // @todo check if output is already running ? - if (recordingPid == -1 && config != NULL){ - configLock.post(); - configLock.close(); - - INFO_MSG("The stream %s has a value specified for the recording. " - "We're going to start an output and record into %s", - config->getString("streamname").c_str(), - streamCfg.getMember("record").asString().c_str()); + // @todo check if output is already running ? + if (recordingPid == -1 && config != NULL) { - recordingPid = Util::startRecording(config->getString("streamname")); - if (recordingPid < 0) { - FAIL_MSG("Failed to start the recording for %s", config->getString("streamname").c_str()); - // @todo shouldn't we do configList.post(), configLock.close() and return false? - // @todo discuss with Jaron. 2015.09.26, remove this comment when discussed. - } - INFO_MSG("We started an output for recording with PID: %d", recordingPid); - return true; + INFO_MSG("The stream %s has a value specified for the recording. We're goint to start an output and record into %s", config->getString("streamname").c_str(), streamCfg.getMember("record").asString().c_str()); + + configLock.post(); + configLock.close(); + recordingPid = Util::startRecording(config->getString("streamname")); + if (recordingPid < 0) { + FAIL_MSG("Failed to start the recording for %s", config->getString("streamname").c_str()); } + INFO_MSG("We started an output for recording with PID: %d", recordingPid); + return true; } + } /* roxlu-end */ @@ -979,15 +1033,15 @@ namespace Mist { return true; } - bool inputBuffer::readHeader(){ + bool inputBuffer::readHeader() { return true; } - void inputBuffer::getNext(bool smart){} + void inputBuffer::getNext(bool smart) {} - void inputBuffer::seek(int seekTime){} + void inputBuffer::seek(int seekTime) {} - void inputBuffer::trackSelect(std::string trackSpec){} + void inputBuffer::trackSelect(std::string trackSpec) {} } diff --git a/src/input/input_dtsc.cpp b/src/input/input_dtsc.cpp index 165c1b5a..53eb74a0 100644 --- a/src/input/input_dtsc.cpp +++ b/src/input/input_dtsc.cpp @@ -7,6 +7,9 @@ #include #include +#include +#include + #include "input_dtsc.h" namespace Mist { @@ -15,6 +18,7 @@ namespace Mist { capa["desc"] = "Enables DTSC Input"; capa["priority"] = 9ll; capa["source_match"] = "/*.dtsc"; + capa["stream_match"] = "dtsc://*"; capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("H263"); capa["codecs"][0u][0u].append("VP6"); @@ -22,59 +26,253 @@ namespace Mist { capa["codecs"][0u][1u].append("AAC"); capa["codecs"][0u][1u].append("MP3"); capa["codecs"][0u][1u].append("vorbis"); + + + JSON::Value option; + option["long"] = "pull"; + option["short"] = "p"; + option["help"] = "Start this input in pull mode."; + option["value"].append(0ll); + config->addOption("pull", option); + } + + + void parseDTSCURI(const std::string & src, std::string & host, uint16_t & port, std::string & password, std::string & streamName) { + host = ""; + port = 4200; + password = ""; + streamName = ""; + std::deque matches; + if (Util::stringScan(src, "%s:%s@%s/%s", matches)) { + host = matches[0]; + port = atoi(matches[1].c_str()); + password = matches[2]; + streamName = matches[3]; + return; + } + //Using default streamname + if (Util::stringScan(src, "%s:%s@%s", matches)) { + host = matches[0]; + port = atoi(matches[1].c_str()); + password = matches[2]; + return; + } + //Without password + if (Util::stringScan(src, "%s:%s/%s", matches)) { + host = matches[0]; + port = atoi(matches[1].c_str()); + streamName = matches[2]; + return; + } + //Using default port + if (Util::stringScan(src, "%s@%s/%s", matches)) { + host = matches[0]; + password = matches[1]; + streamName = matches[2]; + return; + } + //Default port, no password + if (Util::stringScan(src, "%s/%s", matches)) { + host = matches[0]; + streamName = matches[1]; + return; + } + //No password, default streamname + if (Util::stringScan(src, "%s:%s", matches)) { + host = matches[0]; + port = atoi(matches[1].c_str()); + return; + } + //Default port and streamname + if (Util::stringScan(src, "%s@%s", matches)) { + host = matches[0]; + password = matches[1]; + return; + } + //Default port and streamname, no password + if (Util::stringScan(src, "%s", matches)) { + host = matches[0]; + return; + } + } + + void inputDTSC::parseStreamHeader() { + while (srcConn.connected()){ + srcConn.spool(); + if (srcConn.Received().available(8)){ + if (srcConn.Received().copy(4) == "DTCM" || srcConn.Received().copy(4) == "DTSC") { + // Command message + std::string toRec = srcConn.Received().copy(8); + unsigned long rSize = Bit::btohl(toRec.c_str() + 4); + if (!srcConn.Received().available(8 + rSize)) { + continue; //abort - not enough data yet + } + //Ignore initial DTCM message, as this is a "hi" message from the server + if (srcConn.Received().copy(4) == "DTCM"){ + srcConn.Received().remove(8 + rSize); + }else{ + std::string dataPacket = srcConn.Received().remove(8+rSize); + DTSC::Packet metaPack(dataPacket.data(), dataPacket.size()); + myMeta.reinit(metaPack); + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + continueNegotiate(it->first, true); + } + break; + } + }else{ + INFO_MSG("Received a wrong type of packet - '%s'", srcConn.Received().copy(4).c_str()); + break; + } + } + } + } + + bool inputDTSC::openStreamSource() { + std::string source = config->getString("input"); + if (source.find("dtsc://") == 0) { + source.erase(0, 7); + } + std::string host; + uint16_t port; + std::string password; + std::string streamName; + parseDTSCURI(source, host, port, password, streamName); + std::string givenStream = config->getString("streamname"); + if (streamName == "") { + streamName = givenStream; + }else{ + if (givenStream.find("+") != std::string::npos){ + streamName += givenStream.substr(givenStream.find("+")); + } + } + srcConn = Socket::Connection(host, port, true); + if (!srcConn.connected()){ + return false; + } + JSON::Value prep; + prep["cmd"] = "play"; + prep["version"] = "MistServer " PACKAGE_VERSION; + prep["stream"] = streamName; + srcConn.SendNow("DTCM"); + char sSize[4] = {0, 0, 0, 0}; + Bit::htobl(sSize, prep.packedSize()); + srcConn.SendNow(sSize, 4); + prep.sendTo(srcConn); + return true; + } + + void inputDTSC::closeStreamSource(){ + srcConn.close(); } bool inputDTSC::setup() { - if (config->getString("input") == "-") { - std::cerr << "Input from stdin not yet supported" << std::endl; - return false; - } - if (!config->getString("streamname").size()){ - if (config->getString("output") == "-") { - std::cerr << "Output to stdout not yet supported" << std::endl; + if (streamMode) { + return true; + } else { + if (config->getString("input") == "-") { + std::cerr << "Input from stdin not yet supported" << std::endl; return false; } - }else{ - if (config->getString("output") != "-") { - std::cerr << "File output in player mode not supported" << std::endl; + if (!config->getString("streamname").size()) { + if (config->getString("output") == "-") { + std::cerr << "Output to stdout not yet supported" << std::endl; + return false; + } + } else { + if (config->getString("output") != "-") { + std::cerr << "File output in player mode not supported" << std::endl; + return false; + } + } + + //open File + inFile = DTSC::File(config->getString("input")); + if (!inFile) { return false; } } - - //open File - inFile = DTSC::File(config->getString("input")); - if (!inFile) { - return false; - } return true; } bool inputDTSC::readHeader() { + if (streamMode) { + return true; + } if (!inFile) { return false; } DTSC::File tmp(config->getString("input") + ".dtsh"); if (tmp) { myMeta = tmp.getMeta(); - DEBUG_MSG(DLVL_HIGH,"Meta read in with %lu tracks", myMeta.tracks.size()); + DEBUG_MSG(DLVL_HIGH, "Meta read in with %lu tracks", myMeta.tracks.size()); return true; } if (inFile.getMeta().moreheader < 0 || inFile.getMeta().tracks.size() == 0) { - DEBUG_MSG(DLVL_FAIL,"Missing external header file"); + DEBUG_MSG(DLVL_FAIL, "Missing external header file"); return false; } myMeta = DTSC::Meta(inFile.getMeta()); - DEBUG_MSG(DLVL_DEVEL,"Meta read in with %lu tracks", myMeta.tracks.size()); + DEBUG_MSG(DLVL_DEVEL, "Meta read in with %lu tracks", myMeta.tracks.size()); return true; } - + void inputDTSC::getNext(bool smart) { - if (smart){ - inFile.seekNext(); + if (streamMode){ + thisPacket.reInit(srcConn); + if (thisPacket.getVersion() == DTSC::DTCM){ + std::string cmd; + thisPacket.getString("cmd", cmd); + if (cmd == "reset"){ + //Read next packet + thisPacket.reInit(srcConn); + if (thisPacket.getVersion() == DTSC::DTSC_HEAD){ + DTSC::Meta newMeta; + newMeta.reinit(thisPacket); + //Detect new tracks + std::set newTracks; + for (std::map::iterator it = newMeta.tracks.begin(); it != newMeta.tracks.end(); it++){ + if (!myMeta.tracks.count(it->first)){ + newTracks.insert(it->first); + } + } + + for (std::set::iterator it = newTracks.begin(); it != newTracks.end(); it++){ + INFO_MSG("Adding track %d to internal metadata", *it); + myMeta.tracks[*it] = newMeta.tracks[*it]; + continueNegotiate(*it, true); + } + + //Detect removed tracks + std::set deletedTracks; + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (!newMeta.tracks.count(it->first)){ + deletedTracks.insert(it->first); + } + } + + for(std::set::iterator it = deletedTracks.begin(); it != deletedTracks.end(); it++){ + INFO_MSG("Deleting track %d from internal metadata", *it); + myMeta.tracks.erase(*it); + } + + //Read next packet before returning + thisPacket.reInit(srcConn); + }else{ + myMeta = DTSC::Meta(); + } + }else{ + //Read next packet before returning + thisPacket.reInit(srcConn); + } + } }else{ - inFile.parseNext(); + if (smart) { + inFile.seekNext(); + } else { + inFile.parseNext(); + } + thisPacket = inFile.getPacket(); } - thisPacket = inFile.getPacket(); } void inputDTSC::seek(int seekTime) { diff --git a/src/input/input_dtsc.h b/src/input/input_dtsc.h index 9a9f12db..2e894493 100644 --- a/src/input/input_dtsc.h +++ b/src/input/input_dtsc.h @@ -7,6 +7,9 @@ namespace Mist { inputDTSC(Util::Config * cfg); protected: //Private Functions + bool openStreamSource(); + void closeStreamSource(); + void parseStreamHeader(); bool setup(); bool readHeader(); void getNext(bool smart = true); @@ -14,6 +17,8 @@ namespace Mist { void trackSelect(std::string trackSpec); DTSC::File inFile; + + Socket::Connection srcConn; }; } diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 1cbbe72d..34ccfdc4 100755 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -441,6 +441,10 @@ namespace Mist { } } while (threadCount); } + + bool inputTS::needsLock() { + return false; + } #endif } diff --git a/src/input/input_ts.h b/src/input/input_ts.h index fe29da67..645d4870 100755 --- a/src/input/input_ts.h +++ b/src/input/input_ts.h @@ -13,6 +13,9 @@ namespace Mist { public: inputTS(Util::Config * cfg); ~inputTS(); +#ifdef TSLIVE_INPUT + bool needsLock(); +#endif protected: //Private Functions bool setup(); diff --git a/src/input/mist_in.cpp b/src/input/mist_in.cpp index d3319c59..1fa81f91 100644 --- a/src/input/mist_in.cpp +++ b/src/input/mist_in.cpp @@ -17,32 +17,33 @@ int main(int argc, char * argv[]) { if (conf.parseArgs(argc, argv)) { std::string streamName = conf.getString("streamname"); conv.argumentsParsed(); -#ifndef INPUT_NOLOCK + IPC::semaphore playerLock; - if (streamName.size()){ - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str()); - playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!playerLock.tryWait()){ - DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str()); - return 1; + if (conv.needsLock()){ + if (streamName.size()){ + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str()); + playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!playerLock.tryWait()){ + DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str()); + return 1; + } } } -#endif conf.activate(); while (conf.is_active){ pid_t pid = fork(); if (pid == 0){ -#ifndef INPUT_NOLOCK - playerLock.close(); -#endif + if (conv.needsLock()){ + playerLock.close(); + } return conv.run(); } if (pid == -1){ DEBUG_MSG(DLVL_FAIL, "Unable to spawn player process"); -#ifndef INPUT_NOLOCK - playerLock.post(); -#endif + if (conv.needsLock()){ + playerLock.post(); + } return 2; } //wait for the process to exit @@ -71,11 +72,11 @@ int main(int argc, char * argv[]) { DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); } } -#ifndef INPUT_NOLOCK - playerLock.post(); - playerLock.unlink(); - playerLock.close(); -#endif + if (conv.needsLock()){ + playerLock.post(); + playerLock.unlink(); + playerLock.close(); + } } return 0; } diff --git a/src/io.cpp b/src/io.cpp index 9f84ea97..595b6c6f 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -559,10 +559,11 @@ namespace Mist { bufferNext(packet, myMeta); } - void InOutBase::continueNegotiate(unsigned long tid) { - nProxy.continueNegotiate(tid, myMeta); + void InOutBase::continueNegotiate(unsigned long tid, bool quickNegotiate) { + nProxy.continueNegotiate(tid, myMeta, quickNegotiate); } - void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta) { + + void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate) { if (!tid) { return; } @@ -618,12 +619,49 @@ namespace Mist { unsigned long offset = 6 * trackOffset[tid]; //If we have a new track to negotiate if (!trackState.count(tid)) { - INFO_MSG("Starting negotiation for incoming track %lu, at offset %lu", tid, trackOffset[tid]); memset(tmp + offset, 0, 4); - tmp[offset] = 0x80; - tmp[offset + 4] = ((tid >> 8) & 0xFF); - tmp[offset + 5] = (tid & 0xFF); - trackState[tid] = FILL_NEW; + if (quickNegotiate){ + + unsigned long finalTid = getpid() + tid; + unsigned short firstPage = 1; + INFO_MSG("HANDLING quick negotiation for track %d ~> %d", tid, finalTid) + MEDIUM_MSG("Buffer has indicated that incoming track %lu should start writing on track %lu, page %lu", tid, finalTid, firstPage); + trackMap[tid] = finalTid; + if (myMeta.tracks.count(finalTid) && myMeta.tracks[finalTid].lastms){ + myMeta.tracks[finalTid].lastms = 0; + } + trackState[tid] = FILL_ACC; + + + + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), finalTid); + metaPages[tid].init(pageName, 8 * 1024 * 1024, true); + metaPages[tid].master = false; + DTSC::Meta tmpMeta; + tmpMeta.tracks[finalTid] = myMeta.tracks[tid]; + tmpMeta.tracks[finalTid].trackID = finalTid; + JSON::Value tmpVal = tmpMeta.toJSON(); + std::string tmpStr = tmpVal.toNetPacked(); + memcpy(metaPages[tid].mapped, tmpStr.data(), tmpStr.size()); + + + + + + snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), finalTid); + metaPages[tid].init(pageName, 8 * 1024 * 1024, true); + metaPages[tid].master = false; + Bit::htobl(tmp + offset, finalTid | 0xC0000000); + Bit::htobs(tmp + offset + 4, firstPage); + }else{ + INFO_MSG("Starting negotiation for incoming track %lu, at offset %lu", tid, trackOffset[tid]); + memset(tmp + offset, 0, 4); + tmp[offset] = 0x80; + tmp[offset + 4] = ((tid >> 8) & 0xFF); + tmp[offset + 5] = (tid & 0xFF); + trackState[tid] = FILL_NEW; + } return; } #if defined(__CYGWIN__) || defined(_WIN32) diff --git a/src/io.h b/src/io.h index c2103701..438be96f 100644 --- a/src/io.h +++ b/src/io.h @@ -59,7 +59,7 @@ namespace Mist { std::map iVecs; IPC::sharedPage encryptionPage; - void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta); + void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate = false); }; ///\brief Class containing all basic input and output functions. @@ -74,7 +74,7 @@ namespace Mist { void bufferLivePacket(JSON::Value & packet); void bufferLivePacket(DTSC::Packet & packet); protected: - void continueNegotiate(unsigned long tid); + void continueNegotiate(unsigned long tid, bool quickNegotiate = false); diff --git a/src/output/output.cpp b/src/output/output.cpp index 35d562bf..ac81a026 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -278,6 +278,16 @@ namespace Mist { onFail(); return; } + if (!source.size()){ + std::string strName = streamName; + Util::sanitizeName(strName); + IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities + IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + configLock.wait(); + DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); + source = streamCfg.getMember("source").asString(); + configLock.post(); + } char pageId[NAME_BUFFER_SIZE]; snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); nProxy.metaPages.clear(); @@ -416,9 +426,20 @@ namespace Mist { // when we don't see this explicitly it makes debugging the recording feature // a bit painfull :) if (selectedTracks.size() == 0) { - WARN_MSG("We didn't find any tracks which that we can use. selectedTrack.size() is 0."); + INSANE_MSG("We didn't find any tracks which that we can use. selectedTrack.size() is 0."); for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ - WARN_MSG("Found track/codec: %s", trit->second.codec.c_str()); + INSANE_MSG("Found track/codec: %s", trit->second.codec.c_str()); + } + if (!myMeta.tracks.size() && (source.find("dtsc://") == 0)){ + //Wait 5 seconds and try again. Keep a counter, try at most 3 times + static int counter = 0; + if (counter++ < 10){ + Util::wait(1000); + nProxy.userClient.keepAlive(); + stats(); + updateMeta(); + selectDefaultTracks(); + } } } /*end-roxlu*/ @@ -898,6 +919,25 @@ namespace Mist { } if ( !sentHeader){ DEBUG_MSG(DLVL_DONTEVEN, "sendHeader"); + bool waitLonger = false; + if (!myMeta.tracks.size()){ + waitLonger = true; + }else{ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (!it->second.keys.size()){ + waitLonger = true; + break; + } + } + } + if (waitLonger){ + updateMeta(); + Util::sleep(1000); + static unsigned int metaTries = 0; + if(++metaTries < 7){ + continue; + } + } sendHeader(); } prepareNext(); diff --git a/src/output/output.h b/src/output/output.h index d2b541ec..f9443091 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -108,6 +108,7 @@ namespace Mist { bool sought;///= fastAsPossibleTime){ + realTime = 1000; + } + if (thisPacket.getFlag("keyframe")){ + std::set availableTracks; + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (it->second.type == "video" || it->second.type == "audio"){ + availableTracks.insert(it->first); + } + } + if (availableTracks != selectedTracks){ + //reset, resendheader + JSON::Value prep; + prep["cmd"] = "reset"; + /// \todo Make this securererer. + unsigned long sendSize = prep.packedSize(); + myConn.SendNow("DTCM"); + char sSize[4] = {0, 0, 0, 0}; + Bit::htobl(sSize, prep.packedSize()); + myConn.SendNow(sSize, 4); + prep.sendTo(myConn); + } + } myConn.SendNow(thisPacket.getData(), thisPacket.getDataLen()); } void OutDTSC::sendHeader(){ sentHeader = true; - myMeta.send(myConn, true); + selectedTracks.clear(); + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (it->second.type == "video" || it->second.type == "audio"){ + selectedTracks.insert(it->first); + } + } + myMeta.send(myConn, true, selectedTracks); + if (myMeta.live){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (!fastAsPossibleTime || it->second.lastms < fastAsPossibleTime){ + fastAsPossibleTime = it->second.lastms; + realTime = 0; + } + } + }else{ + fastAsPossibleTime = 50000;//50 seconds + realTime = 0; + } } void OutDTSC::onRequest(){ @@ -76,6 +117,8 @@ namespace Mist { streamName = dScan.getMember("stream").asString(); Util::sanitizeName(streamName); parseData = true; + INFO_MSG("Handled play for stream %s", streamName.c_str()); + setBlocking(false); } void OutDTSC::handlePush(DTSC::Scan & dScan){ diff --git a/src/output/output_dtsc.h b/src/output/output_dtsc.h index ba01630b..52103c5d 100644 --- a/src/output/output_dtsc.h +++ b/src/output/output_dtsc.h @@ -15,6 +15,7 @@ namespace Mist { bool pushing; void handlePush(DTSC::Scan & dScan); void handlePlay(DTSC::Scan & dScan); + unsigned long long fastAsPossibleTime; }; } diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index 0ed8f70c..68f22e95 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -6,6 +6,27 @@ namespace Mist { ///\brief Builds an index file for HTTP Live streaming. ///\return The index file for HTTP Live Streaming. std::string OutHLS::liveIndex() { + + static int timer = 0; + bool checkWait = true; + while (checkWait && ++timer < 10){ + checkWait = false; + if (!myMeta.tracks.size()){ + checkWait = true; + } + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (it->second.keys.size() <= 3){ + checkWait = true; + break; + } + } + if (checkWait){ + Util::sleep(500); + INFO_MSG("SLeeping timer %d", timer); + updateMeta(); + } + } + std::stringstream result; result << "#EXTM3U\r\n"; int audioId = -1; diff --git a/src/output/output_progressive_mp4.cpp b/src/output/output_progressive_mp4.cpp index 9b226ccc..931c01f4 100644 --- a/src/output/output_progressive_mp4.cpp +++ b/src/output/output_progressive_mp4.cpp @@ -43,6 +43,9 @@ namespace Mist { ///\todo This function does not indicate errors anywhere... maybe fix this... std::string OutProgressiveMP4::DTSCMeta2MP4Header(long long & size, int fragmented) { + if (myMeta.live){ + completeKeysOnly = true; + } //Make sure we have a proper being value for the size... size = 0; //Stores the result of the function @@ -745,6 +748,28 @@ namespace Mist { void OutProgressiveMP4::setvidTrack() { vidTrack = 0; + static int timer = 0; + bool checkWait = true; + while (checkWait && ++timer < 10){ + checkWait = false; + if (!myMeta.tracks.size()){ + checkWait = true; + } + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (!it->second.keys.size()){ + checkWait = true; + break; + } + } + if (checkWait){ + Util::sleep(500); + updateMeta(); + } + } + + if (!selectedTracks.size()){ + selectDefaultTracks(); + } for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) { //Find video track if (myMeta.tracks[*it].type == "video") {