Added resume support and quick negotiation support from Pro, as well as support for stream-type inputs.
This commit is contained in:
		
							parent
							
								
									d4e2654f24
								
							
						
					
					
						commit
						837b2b5d4f
					
				
					 3 changed files with 169 additions and 0 deletions
				
			
		|  | @ -125,6 +125,8 @@ namespace Mist { | |||
|      | ||||
|     if (!streamName.size()) { | ||||
|       convert(); | ||||
|     } else if (!needsLock()) { | ||||
|       stream(); | ||||
|     }else{ | ||||
|       serve(); | ||||
|     } | ||||
|  | @ -195,6 +197,78 @@ namespace Mist { | |||
|     //end player functionality
 | ||||
|   } | ||||
| 
 | ||||
|   /// Main loop for stream-style inputs.
 | ||||
|   /// This loop will start the buffer without resume support, and then repeatedly call ..... followed by ....
 | ||||
|   void Input::stream(){ | ||||
|     IPC::semaphore pullLock; | ||||
|     pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); | ||||
|     if (!pullLock.tryWait()){ | ||||
|       DEBUG_MSG(DLVL_DEVEL, "A pull process for stream %s is already running", streamName.c_str()); | ||||
|       pullLock.close(); | ||||
|       return; | ||||
|     } | ||||
|     if (Util::streamAlive(streamName)){ | ||||
|       pullLock.post(); | ||||
|       pullLock.close(); | ||||
|       pullLock.unlink(); | ||||
|       return; | ||||
|     } | ||||
|     if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer
 | ||||
|       pullLock.post(); | ||||
|       pullLock.close(); | ||||
|       pullLock.unlink(); | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     char userPageName[NAME_BUFFER_SIZE]; | ||||
|     snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); | ||||
|     nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); | ||||
| 
 | ||||
|     DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); | ||||
| 
 | ||||
|     if (!openStreamSource()){ | ||||
|       FAIL_MSG("Unable to connect to source"); | ||||
|       pullLock.post(); | ||||
|       pullLock.close(); | ||||
|       return; | ||||
|     } | ||||
|     parseStreamHeader(); | ||||
|      | ||||
|     if (myMeta.tracks.size() == 0){ | ||||
|       nProxy.userClient.finish(); | ||||
|       finish(); | ||||
|       pullLock.post(); | ||||
|       pullLock.close(); | ||||
|       pullLock.unlink(); | ||||
|       return; | ||||
|     } | ||||
|     nProxy.userClient.countAsViewer = false; | ||||
| 
 | ||||
|     for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ | ||||
|       it->second.firstms = 0; | ||||
|       it->second.lastms = 0; | ||||
|     } | ||||
| 
 | ||||
|     getNext(); | ||||
|     unsigned long long lastTime = Util::getMS(); | ||||
|     unsigned long long lastActive = Util::getMS(); | ||||
|     while (thisPacket && config->is_active && nProxy.userClient.isAlive()){ | ||||
|       nProxy.bufferLivePacket(thisPacket, myMeta); | ||||
|       getNext(); | ||||
|       nProxy.userClient.keepAlive(); | ||||
|     } | ||||
|      | ||||
|     closeStreamSource(); | ||||
| 
 | ||||
|     nProxy.userClient.finish(); | ||||
|     finish(); | ||||
|     pullLock.post(); | ||||
|     pullLock.close(); | ||||
|     pullLock.unlink(); | ||||
|     DEBUG_MSG(DLVL_DEVEL, "Pull input for stream %s closing clean", streamName.c_str()); | ||||
|     return; | ||||
|   } | ||||
| 
 | ||||
|   void Input::finish(){ | ||||
|     for( std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){ | ||||
|       for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ | ||||
|  |  | |||
|  | @ -45,6 +45,7 @@ namespace Mist { | |||
|       virtual void userCallback(char * data, size_t len, unsigned int id); | ||||
|       virtual void convert(); | ||||
|       virtual void serve(); | ||||
|       virtual void stream(); | ||||
|        | ||||
|        | ||||
|       virtual void parseHeader(); | ||||
|  |  | |||
|  | @ -31,6 +31,24 @@ namespace Mist { | |||
|     capa["optional"]["DVR"]["option"] = "--buffer"; | ||||
|     capa["optional"]["DVR"]["type"] = "uint"; | ||||
|     capa["optional"]["DVR"]["default"] = 50000LL; | ||||
| 
 | ||||
|     option["arg"] = "integer"; | ||||
|     option["long"] = "resume"; | ||||
|     option["short"] = "R"; | ||||
|     option["help"] = "Enable resuming support (1) or disable resuming support (0, default)"; | ||||
|     option["value"].append(0LL); | ||||
|     config->addOption("resume", option); | ||||
|     capa["optional"]["resume"]["name"] = "Resume support"; | ||||
|     capa["optional"]["resume"]["help"] = "If enabled, the buffer will linger after source disconnect to allow resuming the stream later. If disabled, the buffer will instantly close on source disconnect."; | ||||
|     capa["optional"]["resume"]["option"] = "--resume"; | ||||
|     capa["optional"]["resume"]["type"] = "select"; | ||||
|     capa["optional"]["resume"]["select"][0u][0u] = "0"; | ||||
|     capa["optional"]["resume"]["select"][0u][1u] = "Disabled"; | ||||
|     capa["optional"]["resume"]["select"][1u][0u] = "1"; | ||||
|     capa["optional"]["resume"]["select"][1u][1u] = "Enabled"; | ||||
|     capa["optional"]["resume"]["default"] = 0LL; | ||||
|     option.null(); | ||||
| 
 | ||||
|     capa["source_match"] = "push://*"; | ||||
|     capa["priority"] = 9ll; | ||||
|     capa["desc"] = "Provides buffered live input"; | ||||
|  | @ -406,6 +424,64 @@ namespace Mist { | |||
|       //Track is set to "New track request", assign new track id and create shared memory page
 | ||||
|       //This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process
 | ||||
|       if (value & 0x80000000) { | ||||
|         if (value & 0x40000000) { | ||||
|           unsigned long finalMap = value & ~0xC0000000; | ||||
|           //Register the new track as an active track.
 | ||||
|           activeTracks.insert(finalMap); | ||||
|           //Register the time of registration as initial value for the lastUpdated field, plus an extra 5 seconds just to be sure.
 | ||||
|           lastUpdated[finalMap] = Util::bootSecs() + 5; | ||||
|           //Register the user thats is pushing this element
 | ||||
|           pushLocation[finalMap] = data; | ||||
|           //Initialize the metadata for this track
 | ||||
|           if (!myMeta.tracks.count(finalMap)) { | ||||
|             DEBUG_MSG(DLVL_MEDIUM, "Inserting metadata for track number %d", finalMap); | ||||
|              | ||||
|             IPC::sharedPage tMeta; | ||||
| 
 | ||||
|             char tempMetaName[NAME_BUFFER_SIZE]; | ||||
|             snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), finalMap); | ||||
|             tMeta.init(tempMetaName, 8388608, false); | ||||
| 
 | ||||
|             //The page exist, now we try to read in the metadata of the track
 | ||||
| 
 | ||||
|             //Store the size of the dtsc packet to read.
 | ||||
|             unsigned int len = ntohl(((int *)tMeta.mapped)[1]); | ||||
|             //Temporary variable, won't be used again
 | ||||
|             unsigned int tempForReadingMeta = 0; | ||||
|             //Read in the metadata through a temporary JSON object
 | ||||
|             ///\todo Optimize this part. Find a way to not have to store the metadata in JSON first, but read it from the page immediately
 | ||||
|             JSON::Value tempJSONForMeta; | ||||
|             JSON::fromDTMI((const unsigned char *)tMeta.mapped + 8, len, tempForReadingMeta, tempJSONForMeta); | ||||
|              | ||||
|             tMeta.master = true; | ||||
| 
 | ||||
|             //Construct a metadata object for the current track
 | ||||
|             DTSC::Meta trackMeta(tempJSONForMeta); | ||||
| 
 | ||||
|             myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second; | ||||
|             myMeta.tracks[finalMap].firstms = 0; | ||||
|             myMeta.tracks[finalMap].lastms = 0; | ||||
| 
 | ||||
|             userConn.setTrackId(index, finalMap); | ||||
|             userConn.setKeynum(index, 0x0000); | ||||
| 
 | ||||
| 
 | ||||
|             char firstPage[NAME_BUFFER_SIZE]; | ||||
|             snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); | ||||
|             nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false); | ||||
| 
 | ||||
|             //Update the metadata for this track
 | ||||
|             updateTrackMeta(finalMap); | ||||
|             hasPush = true; | ||||
|           } | ||||
|           //Write the final mapped track number and keyframe number to the user page element
 | ||||
|           //This is used to resume pushing as well as pushing new tracks
 | ||||
|           userConn.setTrackId(index, finalMap); | ||||
|           userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size()); | ||||
|           //Update the metadata to reflect all changes
 | ||||
|           updateMeta(); | ||||
|           continue; | ||||
|         } | ||||
|         //Set the temporary track id for this item, and increase the temporary value for use with the next track
 | ||||
|         unsigned long long tempMapping = nextTempId++; | ||||
|         //Add the temporary track id to the list of tracks that are currently being negotiated
 | ||||
|  | @ -646,6 +722,24 @@ namespace Mist { | |||
|       bufferTime = tmpNum; | ||||
|     } | ||||
| 
 | ||||
|     //if stream is configured and setting is present, use it, always
 | ||||
|     if (streamCfg && streamCfg.getMember("resume")) { | ||||
|       tmpNum = streamCfg.getMember("resume").asInt(); | ||||
|     } else { | ||||
|       if (streamCfg) { | ||||
|         //otherwise, if stream is configured use the default
 | ||||
|         tmpNum = config->getOption("resume", true)[0u].asInt(); | ||||
|       } else { | ||||
|         //if not, use the commandline argument
 | ||||
|         tmpNum = config->getOption("resume").asInt(); | ||||
|       } | ||||
|     } | ||||
|     //if the new value is different, print a message and apply it
 | ||||
|     if (resumeMode != (bool)tmpNum) { | ||||
|       DEBUG_MSG(DLVL_DEVEL, "Setting resume mode from %s to new value of %s", resumeMode ? "enabled" : "disabled", tmpNum ? "enabled" : "disabled"); | ||||
|       resumeMode = tmpNum; | ||||
|     } | ||||
| 
 | ||||
|     configLock.post(); | ||||
|     configLock.close(); | ||||
|     return true; | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Thulinma
						Thulinma