diff --git a/src/input/input.cpp b/src/input/input.cpp index 2203394d..cfaf6b0c 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -125,6 +125,8 @@ namespace Mist { if (!streamName.size()) { convert(); + } else if (!needsLock()) { + stream(); }else{ serve(); } @@ -195,6 +197,78 @@ namespace Mist { //end player functionality } + /// Main loop for stream-style inputs. + /// This loop will start the buffer without resume support, and then repeatedly call ..... followed by .... + void Input::stream(){ + IPC::semaphore pullLock; + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!pullLock.tryWait()){ + DEBUG_MSG(DLVL_DEVEL, "A pull process for stream %s is already running", streamName.c_str()); + pullLock.close(); + return; + } + if (Util::streamAlive(streamName)){ + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + return; + } + if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + return; + } + + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); + nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); + + DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); + + if (!openStreamSource()){ + FAIL_MSG("Unable to connect to source"); + pullLock.post(); + pullLock.close(); + return; + } + parseStreamHeader(); + + if (myMeta.tracks.size() == 0){ + nProxy.userClient.finish(); + finish(); + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + return; + } + nProxy.userClient.countAsViewer = false; + + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + it->second.firstms = 0; + it->second.lastms = 0; + } + + getNext(); + unsigned long long lastTime = Util::getMS(); + unsigned long long lastActive = Util::getMS(); + while (thisPacket && config->is_active && nProxy.userClient.isAlive()){ + nProxy.bufferLivePacket(thisPacket, myMeta); + getNext(); + nProxy.userClient.keepAlive(); + } + + closeStreamSource(); + + nProxy.userClient.finish(); + finish(); + pullLock.post(); + pullLock.close(); + pullLock.unlink(); + DEBUG_MSG(DLVL_DEVEL, "Pull input for stream %s closing clean", streamName.c_str()); + return; + } + void Input::finish(){ for( std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){ for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ diff --git a/src/input/input.h b/src/input/input.h index 1c5c2df5..25589429 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -45,6 +45,7 @@ namespace Mist { virtual void userCallback(char * data, size_t len, unsigned int id); virtual void convert(); virtual void serve(); + virtual void stream(); virtual void parseHeader(); diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 7871a951..a7e9592b 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -31,6 +31,24 @@ namespace Mist { capa["optional"]["DVR"]["option"] = "--buffer"; capa["optional"]["DVR"]["type"] = "uint"; capa["optional"]["DVR"]["default"] = 50000LL; + + option["arg"] = "integer"; + option["long"] = "resume"; + option["short"] = "R"; + option["help"] = "Enable resuming support (1) or disable resuming support (0, default)"; + option["value"].append(0LL); + config->addOption("resume", option); + capa["optional"]["resume"]["name"] = "Resume support"; + capa["optional"]["resume"]["help"] = "If enabled, the buffer will linger after source disconnect to allow resuming the stream later. If disabled, the buffer will instantly close on source disconnect."; + capa["optional"]["resume"]["option"] = "--resume"; + capa["optional"]["resume"]["type"] = "select"; + capa["optional"]["resume"]["select"][0u][0u] = "0"; + capa["optional"]["resume"]["select"][0u][1u] = "Disabled"; + capa["optional"]["resume"]["select"][1u][0u] = "1"; + capa["optional"]["resume"]["select"][1u][1u] = "Enabled"; + capa["optional"]["resume"]["default"] = 0LL; + option.null(); + capa["source_match"] = "push://*"; capa["priority"] = 9ll; capa["desc"] = "Provides buffered live input"; @@ -406,6 +424,64 @@ namespace Mist { //Track is set to "New track request", assign new track id and create shared memory page //This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process if (value & 0x80000000) { + if (value & 0x40000000) { + unsigned long finalMap = value & ~0xC0000000; + //Register the new track as an active track. + activeTracks.insert(finalMap); + //Register the time of registration as initial value for the lastUpdated field, plus an extra 5 seconds just to be sure. + lastUpdated[finalMap] = Util::bootSecs() + 5; + //Register the user thats is pushing this element + pushLocation[finalMap] = data; + //Initialize the metadata for this track + if (!myMeta.tracks.count(finalMap)) { + DEBUG_MSG(DLVL_MEDIUM, "Inserting metadata for track number %d", finalMap); + + IPC::sharedPage tMeta; + + char tempMetaName[NAME_BUFFER_SIZE]; + snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), finalMap); + tMeta.init(tempMetaName, 8388608, false); + + //The page exist, now we try to read in the metadata of the track + + //Store the size of the dtsc packet to read. + unsigned int len = ntohl(((int *)tMeta.mapped)[1]); + //Temporary variable, won't be used again + unsigned int tempForReadingMeta = 0; + //Read in the metadata through a temporary JSON object + ///\todo Optimize this part. Find a way to not have to store the metadata in JSON first, but read it from the page immediately + JSON::Value tempJSONForMeta; + JSON::fromDTMI((const unsigned char *)tMeta.mapped + 8, len, tempForReadingMeta, tempJSONForMeta); + + tMeta.master = true; + + //Construct a metadata object for the current track + DTSC::Meta trackMeta(tempJSONForMeta); + + myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second; + myMeta.tracks[finalMap].firstms = 0; + myMeta.tracks[finalMap].lastms = 0; + + userConn.setTrackId(index, finalMap); + userConn.setKeynum(index, 0x0000); + + + char firstPage[NAME_BUFFER_SIZE]; + snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); + nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false); + + //Update the metadata for this track + updateTrackMeta(finalMap); + hasPush = true; + } + //Write the final mapped track number and keyframe number to the user page element + //This is used to resume pushing as well as pushing new tracks + userConn.setTrackId(index, finalMap); + userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size()); + //Update the metadata to reflect all changes + updateMeta(); + continue; + } //Set the temporary track id for this item, and increase the temporary value for use with the next track unsigned long long tempMapping = nextTempId++; //Add the temporary track id to the list of tracks that are currently being negotiated @@ -646,6 +722,24 @@ namespace Mist { bufferTime = tmpNum; } + //if stream is configured and setting is present, use it, always + if (streamCfg && streamCfg.getMember("resume")) { + tmpNum = streamCfg.getMember("resume").asInt(); + } else { + if (streamCfg) { + //otherwise, if stream is configured use the default + tmpNum = config->getOption("resume", true)[0u].asInt(); + } else { + //if not, use the commandline argument + tmpNum = config->getOption("resume").asInt(); + } + } + //if the new value is different, print a message and apply it + if (resumeMode != (bool)tmpNum) { + DEBUG_MSG(DLVL_DEVEL, "Setting resume mode from %s to new value of %s", resumeMode ? "enabled" : "disabled", tmpNum ? "enabled" : "disabled"); + resumeMode = tmpNum; + } + configLock.post(); configLock.close(); return true;