HLS input converted to URIReader, temporarily disabled clearkey encryptionHLS input fixes:
- Re-enabled file-based input - Added support for clearkey-encrypted file-based input - Converted to HTTP::URIReader for all input types - Both live and VoD functional support
This commit is contained in:
		
							parent
							
								
									61b0a90598
								
							
						
					
					
						commit
						19f1df18aa
					
				
					 3 changed files with 224 additions and 157 deletions
				
			
		|  | @ -250,7 +250,7 @@ namespace TS{ | |||
|     uint64_t firstTime = 0xffffffffffffffffull, lastTime = 0; | ||||
|     for (std::map<size_t, uint32_t>::const_iterator it = pidToCodec.begin(); | ||||
|          it != pidToCodec.end(); it++){ | ||||
|       if (!hasPacket(it->first)){ | ||||
|       if (!hasPacket(it->first) || !outPackets.count(it->first) || !outPackets.at(it->first).size()){ | ||||
|         missing++; | ||||
|       }else{ | ||||
|         if (outPackets.at(it->first).front().getTime() < firstTime){ | ||||
|  |  | |||
|  | @ -114,7 +114,7 @@ namespace Mist{ | |||
|   // These are used in the HTTP::Downloader callback, to prevent timeouts when downloading
 | ||||
|   // segments/playlists.
 | ||||
|   inputHLS *self = 0; | ||||
|   bool callbackFunc(){return self->callback();} | ||||
|   bool callbackFunc(uint8_t){return self->callback();} | ||||
| 
 | ||||
|   /// Called by the global callbackFunc, to prevent timeouts
 | ||||
|   bool inputHLS::callback(){ | ||||
|  | @ -160,12 +160,6 @@ namespace Mist{ | |||
|     MEDIUM_MSG("Downloader thread for '%s' exiting", pls.uri.c_str()); | ||||
|   } | ||||
| 
 | ||||
|   SegmentDownloader::SegmentDownloader(){ | ||||
|     segDL.progressCallback = callbackFunc; | ||||
|     segDL.dataTimeout = 5; | ||||
|     segDL.retryCount = 5; | ||||
|   } | ||||
| 
 | ||||
|   Playlist::Playlist(const std::string &uriSrc){ | ||||
|     nextUTC = 0; | ||||
|     id = 0; // to be set later
 | ||||
|  | @ -184,6 +178,9 @@ namespace Mist{ | |||
|     reloadNext = 0; | ||||
|   } | ||||
| 
 | ||||
|   /// Returns true if there is no protocol defined in the playlist root URL.
 | ||||
|   bool Playlist::isUrl() const{return root.protocol.size();} | ||||
| 
 | ||||
|   void parseKey(std::string key, char *newKey, unsigned int len){ | ||||
|     memset(newKey, 0, len); | ||||
|     for (size_t i = 0; i < key.size() && i < (len << 1); ++i){ | ||||
|  | @ -213,13 +210,93 @@ namespace Mist{ | |||
|     return output; | ||||
|   } | ||||
| 
 | ||||
|   /// Returns true if packetPtr is at the end of the current segment.
 | ||||
|   bool SegmentDownloader::atEnd() const{ | ||||
|     return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size(); | ||||
|   SegmentDownloader::SegmentDownloader(){ | ||||
|     segDL.onProgress(callbackFunc); | ||||
|     encrypted = false; | ||||
|   } | ||||
| 
 | ||||
|   /// Returns true if there is no protocol defined in the playlist root URL.
 | ||||
|   bool Playlist::isUrl() const{return root.protocol.size();} | ||||
|   /// Returns true if packetPtr is at the end of the current segment.
 | ||||
|   bool SegmentDownloader::atEnd() const{ | ||||
|     return segDL.isEOF(); | ||||
|     // return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size();
 | ||||
|   } | ||||
| 
 | ||||
|   /// Attempts to read a single TS packet from the current segment, setting packetPtr on success
 | ||||
|   bool SegmentDownloader::readNext(){ | ||||
|     if (encrypted){ | ||||
|       // Encrypted, need to decrypt
 | ||||
| #ifdef SSL | ||||
|       // Are we exactly at the end of the buffer? Truncate it entirely.
 | ||||
|       if (encOffset == outData.size() || !outData.size()){ | ||||
|         outData.truncate(0); | ||||
|         packetPtr = 0; | ||||
|         encOffset = 0; | ||||
|       } | ||||
|       // Do we already have some data ready? Just serve it.
 | ||||
|       if (encOffset + 188 <= outData.size()){ | ||||
|         packetPtr = outData + encOffset; | ||||
|         encOffset += 188; | ||||
|         if (packetPtr[0] != 0x47){ | ||||
|           FAIL_MSG("Not TS! Starts with byte %" PRIu8, (uint8_t)packetPtr[0]); | ||||
|           return false; | ||||
|         } | ||||
|         return true; | ||||
|       } | ||||
|       // Alright, we need to read some more data.
 | ||||
|       // We read 192 bytes at a time: a single TS packet is 188 bytes but AES-128-CBC encryption works in 16-byte blocks.
 | ||||
|       size_t len = 0; | ||||
|       segDL.readSome(packetPtr, len, 192); | ||||
|       if (!len){return false;} | ||||
|       if (len % 16 != 0){ | ||||
|         FAIL_MSG("Read a non-16-multiple of bytes (%zu), cannot decode!", len); | ||||
|         return false; | ||||
|       } | ||||
|       outData.allocate(outData.size() + len); | ||||
|       mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, len, tmpIvec, (const unsigned char *)packetPtr, | ||||
|                             ((unsigned char *)(char *)outData) + outData.size()); | ||||
|       outData.append(0, len); | ||||
|       // End of the segment? Remove padding data.
 | ||||
|       if (segDL.isEOF()){ | ||||
|         // The padding consists of X bytes of padding, all containing the raw value X.
 | ||||
|         // Since padding is mandatory, we can simply read the last byte and remove X bytes from the length.
 | ||||
|         if (outData.size() <= outData[outData.size() - 1]){ | ||||
|           FAIL_MSG("Encryption padding is >= one TS packet. We probably returned some invalid TS " | ||||
|                    "data earlier :-("); | ||||
|           return false; | ||||
|         } | ||||
|         outData.truncate(outData.size() - outData[outData.size() - 1]); | ||||
|       } | ||||
|       // Okay, we have more data. Let's see if we can return it...
 | ||||
|       if (encOffset + 188 <= outData.size()){ | ||||
|         packetPtr = outData + encOffset; | ||||
|         encOffset += 188; | ||||
|         if (packetPtr[0] != 0x47){ | ||||
|           FAIL_MSG("Not TS! Starts with byte %" PRIu8, (uint8_t)packetPtr[0]); | ||||
|           return false; | ||||
|         } | ||||
|         return true; | ||||
|       } | ||||
| #endif | ||||
|       // No? Then we've failed in our task :'(
 | ||||
|       FAIL_MSG("Could not load encrypted packet :'("); | ||||
|       return false; | ||||
|     }else{ | ||||
|       // Plaintext
 | ||||
|       size_t len = 0; | ||||
|       segDL.readSome(packetPtr, len, 188); | ||||
|       if (len != 188 || packetPtr[0] != 0x47){ | ||||
|         FAIL_MSG("Not a valid TS packet: len %zu, first byte %" PRIu8, len, (uint8_t)packetPtr[0]); | ||||
|         return false; | ||||
|       } | ||||
|       return true; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   /// Attempts to read a single TS packet from the current segment, setting packetPtr on success
 | ||||
|   void SegmentDownloader::close(){ | ||||
|     packetPtr = 0; | ||||
|     segDL.close(); | ||||
|   } | ||||
| 
 | ||||
|   /// Loads the given segment URL into the segment buffer.
 | ||||
|   bool SegmentDownloader::loadSegment(const playListEntries &entry){ | ||||
|  | @ -228,73 +305,30 @@ namespace Mist{ | |||
| 
 | ||||
|     MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(), | ||||
|                hexIvec.c_str()); | ||||
|     if (!segDL.get(entry.filename)){ | ||||
|       FAIL_MSG("failed download: %s", entry.filename.c_str()); | ||||
|     if (!segDL.open(entry.filename)){ | ||||
|       FAIL_MSG("Could not open %s", entry.filename.c_str()); | ||||
|       return false; | ||||
|     } | ||||
| 
 | ||||
|     if (!segDL.isOk()){ | ||||
|       FAIL_MSG("HTTP response not OK!. statuscode: %d, statustext: %s", segDL.getStatusCode(), | ||||
|                segDL.getStatusText().c_str()); | ||||
|       return false; | ||||
|     } | ||||
| 
 | ||||
|     if (segDL.getHeader("Content-Length") != ""){ | ||||
|       if (segDL.data().size() != atoi(segDL.getHeader("Content-Length").c_str())){ | ||||
|         FAIL_MSG("Expected %s bytes of data, but only received %lu.", | ||||
|                  segDL.getHeader("Content-Length").c_str(), segDL.data().size()); | ||||
|         return false; | ||||
|       } | ||||
|     } | ||||
|     if (!segDL){return false;} | ||||
| 
 | ||||
|     encrypted = false; | ||||
|     outData.truncate(0); | ||||
|     // If we have a non-null key, decrypt
 | ||||
|     if (entry.keyAES[0] != 0 || entry.keyAES[1] != 0 || entry.keyAES[2] != 0 || entry.keyAES[3] != 0 || | ||||
|         entry.keyAES[4] != 0 || entry.keyAES[5] != 0 || entry.keyAES[6] != 0 || entry.keyAES[7] != 0 || | ||||
|         entry.keyAES[8] != 0 || entry.keyAES[9] != 0 || entry.keyAES[10] != 0 || entry.keyAES[11] != 0 || | ||||
|         entry.keyAES[12] != 0 || entry.keyAES[13] != 0 || entry.keyAES[14] != 0 || entry.keyAES[15] != 0){ | ||||
|       // Setup AES context
 | ||||
|       mbedtls_aes_context aes; | ||||
|       // Load key for decryption
 | ||||
|       encrypted = true; | ||||
| #ifdef SSL | ||||
|       // Load key
 | ||||
|       mbedtls_aes_setkey_dec(&aes, (const unsigned char *)entry.keyAES, 128); | ||||
|       // Allocate a pointer for writing the decrypted data to
 | ||||
|       static Util::ResizeablePointer outdata; | ||||
|       outdata.allocate(segDL.data().size()); | ||||
|       // Actually decrypt the data
 | ||||
|       unsigned char tmpIvec[16]; | ||||
|       // Load initialization vector
 | ||||
|       memcpy(tmpIvec, entry.ivec, 16); | ||||
| 
 | ||||
|       mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, segDL.data().size(), tmpIvec, | ||||
|                             (const unsigned char *)segDL.data().data(), (unsigned char *)(char *)outdata); | ||||
|       // Data is now still padded, the padding consists of X bytes of padding, all containing the raw value X.
 | ||||
|       // Since padding is mandatory, we can simply read the last byte and remove X bytes from the length.
 | ||||
|       if (segDL.data().size() <= outdata[segDL.data().size() - 1]){ | ||||
|         FAIL_MSG("Encryption padding is >= entire segment. Considering download failed."); | ||||
|         return false; | ||||
|       } | ||||
|       size_t newSize = segDL.data().size() - outdata[segDL.data().size() - 1]; | ||||
|       // Finally, overwrite the original data buffer with the new one
 | ||||
|       segDL.data().assign(outdata, newSize); | ||||
| #endif | ||||
|     } | ||||
| 
 | ||||
|     // check first byte = 0x47. begin of ts file, then check if it is a multiple of 188bytes
 | ||||
|     if (segDL.data().data()[0] == 0x47){ | ||||
|       if (segDL.data().size() % 188){ | ||||
|         FAIL_MSG("Expected a multiple of 188 bytes, received %zu bytes. url: %s", | ||||
|                  segDL.data().size(), entry.filename.c_str()); | ||||
|         return false; | ||||
|       } | ||||
|     }else if (segDL.data().data()[5] == 0x47){ | ||||
|       if (segDL.data().size() % 192){ | ||||
|         FAIL_MSG("Expected a multiple of 192 bytes, received %zu bytes. url: %s", | ||||
|                  segDL.data().size(), entry.filename.c_str()); | ||||
|         return false; | ||||
|       } | ||||
|     }else{ | ||||
|       FAIL_MSG("Segment does not appear to contain TS data. Considering download failed."); | ||||
|       return false; | ||||
|     } | ||||
| 
 | ||||
|     packetPtr = segDL.data().data(); | ||||
|     packetPtr = 0; | ||||
|     HIGH_MSG("Segment download complete and passed sanity checks"); | ||||
|     return true; | ||||
|   } | ||||
|  | @ -359,16 +393,23 @@ namespace Mist{ | |||
|           keyUri = val.substr(tmpPos + 5, tmpPos2); | ||||
| 
 | ||||
|           tmpPos = val.find("IV="); | ||||
|           keyIV = val.substr(tmpPos + 5, 32); | ||||
|           if (tmpPos != std::string::npos){keyIV = val.substr(tmpPos + 5, 32);} | ||||
| 
 | ||||
|           // when key not found, download and store it in the map
 | ||||
|           if (!keys.count(keyUri)){ | ||||
|             HTTP::Downloader keyDL; | ||||
|             if (!keyDL.get(root.link(keyUri)) || !keyDL.isOk()){ | ||||
|             HTTP::URIReader keyDL; | ||||
|             if (!keyDL.open(root.link(keyUri)) || !keyDL){ | ||||
|               FAIL_MSG("Could not retrieve decryption key from '%s'", root.link(keyUri).getUrl().c_str()); | ||||
|               continue; | ||||
|             } | ||||
|             keys.insert(std::pair<std::string, std::string>(keyUri, keyDL.data())); | ||||
|             char *keyPtr; | ||||
|             size_t keyLen; | ||||
|             keyDL.readAll(keyPtr, keyLen); | ||||
|             if (!keyLen){ | ||||
|               FAIL_MSG("Could not retrieve decryption key from '%s'", root.link(keyUri).getUrl().c_str()); | ||||
|               continue; | ||||
|             } | ||||
|             keys.insert(std::pair<std::string, std::string>(keyUri, std::string(keyPtr, keyLen))); | ||||
|           } | ||||
|         } | ||||
| 
 | ||||
|  | @ -411,7 +452,12 @@ namespace Mist{ | |||
|         cleanLine(filename); | ||||
|         filename = root.link(filename).getUrl(); | ||||
|         char ivec[16]; | ||||
|         parseKey(keyIV, ivec, 16); | ||||
|         if (keyIV.size()){ | ||||
|           parseKey(keyIV, ivec, 16); | ||||
|         }else{ | ||||
|           memset(ivec, 0, 16); | ||||
|           Bit::htobll(ivec + 8, fileNo); | ||||
|         } | ||||
|         addEntry(filename, f, totalBytes, keys[keyUri], std::string(ivec, 16)); | ||||
|         lastFileIndex = fileNo + 1; | ||||
|         ++count; | ||||
|  | @ -491,8 +537,8 @@ namespace Mist{ | |||
|       // The mutex assures we have a unique count/number.
 | ||||
|       if (!id){id = listEntries.size() + 1;} | ||||
|       listEntries[id].push_back(entry); | ||||
|       MEDIUM_MSG("Added segment to variant %" PRIu32 " (#%d, now %zu queued): %s", id, | ||||
|                  lastFileIndex, listEntries[id].size(), filename.c_str()); | ||||
|       DONTEVEN_MSG("Added segment to variant %" PRIu32 " (#%" PRIu64 ", now %zu queued): %s", id, | ||||
|                    lastFileIndex, listEntries[id].size(), filename.c_str()); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|  | @ -563,8 +609,8 @@ namespace Mist{ | |||
|       return; | ||||
|     } | ||||
|     myMeta = DTSC::Meta(); | ||||
|     myMeta.live = false; | ||||
|     myMeta.vod = true; | ||||
|     myMeta.live = true; | ||||
|     myMeta.vod = false; | ||||
|     INFO_MSG("Parsing live stream to create header..."); | ||||
|     TS::Packet packet; // to analyse and extract data
 | ||||
|     int counter = 1; | ||||
|  | @ -579,7 +625,6 @@ namespace Mist{ | |||
| 
 | ||||
|       for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin(); | ||||
|            entryIt != pListIt->second.end(); ++entryIt){ | ||||
|         uint64_t lastBpos = entryIt->bytePos; | ||||
|         nProxy.userClient.keepAlive(); | ||||
|         if (!segDowner.loadSegment(*entryIt)){ | ||||
|           WARN_MSG("Skipping segment that could not be loaded in an attempt to recover"); | ||||
|  | @ -588,13 +633,12 @@ namespace Mist{ | |||
|         } | ||||
| 
 | ||||
|         do{ | ||||
|           if (!packet.FromPointer(segDowner.packetPtr)){ | ||||
|             WARN_MSG("Could not load TS packet, aborting segment parse"); | ||||
|           if (!segDowner.readNext() || !packet.FromPointer(segDowner.packetPtr)){ | ||||
|             WARN_MSG("Could not load TS packet from %s, aborting segment parse", entryIt->filename.c_str()); | ||||
|             tsStream.clear(); | ||||
|             break; // Abort load
 | ||||
|           } | ||||
|           tsStream.parse(packet, lastBpos); | ||||
|           segDowner.packetPtr += 188; | ||||
|           tsStream.parse(packet, entryIt->bytePos); | ||||
| 
 | ||||
|           if (tsStream.hasPacketOnEachTrack()){ | ||||
|             while (tsStream.hasPacket()){ | ||||
|  | @ -607,15 +651,15 @@ namespace Mist{ | |||
|                 pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; | ||||
|                 pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); | ||||
|                 packetId = counter; | ||||
|                 VERYHIGH_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(), | ||||
|                              headerPack.getTrackId(), counter); | ||||
|                 VERYHIGH_MSG("Added file %s, trackid: %zu, mapped to: %d", | ||||
|                              entryIt->filename.c_str(), headerPack.getTrackId(), counter); | ||||
|                 counter++; | ||||
|               } | ||||
| 
 | ||||
|               if ((!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ | ||||
|                 tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); | ||||
|                 myMeta.tracks[packetId].minKeepAway = globalWaitTime * 2000; | ||||
|                 VERYHIGH_MSG("setting minKeepAway = %d for track: %d", | ||||
|                 VERYHIGH_MSG("setting minKeepAway = %d for track: %" PRIu64, | ||||
|                              myMeta.tracks[packetId].minKeepAway, packetId); | ||||
|               } | ||||
|             } | ||||
|  | @ -627,17 +671,13 @@ namespace Mist{ | |||
|     } | ||||
|     tsStream.clear(); | ||||
|     currentPlaylist = 0; | ||||
|     segDowner.segDL.data().clear(); // make sure we have nothing left over
 | ||||
|     segDowner.close(); // make sure we have nothing left over
 | ||||
|     INFO_MSG("header complete, beginning live ingest of %d tracks", counter - 1); | ||||
|   } | ||||
| 
 | ||||
|   bool inputHLS::readHeader(){ | ||||
|     if (streamIsLive){return true;} | ||||
| 
 | ||||
|     std::istringstream urlSource; | ||||
|     std::ifstream fileSource; | ||||
| 
 | ||||
|     bool endOfFile = false; | ||||
|     bool hasHeader = false; | ||||
| 
 | ||||
|     // See whether a separate header file exists.
 | ||||
|  | @ -664,23 +704,13 @@ namespace Mist{ | |||
|       for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin(); | ||||
|            entryIt != pListIt->second.end(); entryIt++){ | ||||
|         tsStream.partialClear(); | ||||
|         endOfFile = false; | ||||
| 
 | ||||
|         segDowner.loadSegment(*entryIt); | ||||
|         endOfFile = !segDowner.atEnd(); | ||||
|         if (!endOfFile){packet.FromPointer(segDowner.packetPtr);} | ||||
|         segDowner.packetPtr += 188; | ||||
| 
 | ||||
|         if (!segDowner.loadSegment(*entryIt)){ | ||||
|           FAIL_MSG("Failed to load segment - skipping to next"); | ||||
|           continue; | ||||
|         } | ||||
|         entId++; | ||||
|         uint64_t lastBpos = entryIt->bytePos; | ||||
|         while (!endOfFile){ | ||||
|           tsStream.parse(packet, lastBpos); | ||||
| 
 | ||||
|           // if (pListIt->isUrl()){
 | ||||
|           lastBpos = entryIt->bytePos + segDowner.segDL.data().size(); | ||||
|           //}else{
 | ||||
|           //  lastBpos = entryIt->bytePos + in.tellg();
 | ||||
|           //}
 | ||||
|         while (!segDowner.atEnd()){ | ||||
| 
 | ||||
|           while (tsStream.hasPacketOnEachTrack()){ | ||||
|             DTSC::Packet headerPack; | ||||
|  | @ -693,6 +723,8 @@ namespace Mist{ | |||
|               pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; | ||||
|               pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); | ||||
|               packetId = counter; | ||||
|               INFO_MSG("Added file %s, trackid: %zu, mapped to: %d", entryIt->filename.c_str(), | ||||
|                        headerPack.getTrackId(), counter); | ||||
|               counter++; | ||||
|             } | ||||
| 
 | ||||
|  | @ -712,16 +744,10 @@ namespace Mist{ | |||
|             } | ||||
|           } | ||||
| 
 | ||||
|           // if (pListIt->isUrl()){
 | ||||
|           endOfFile = segDowner.atEnd(); | ||||
|           if (!endOfFile){ | ||||
|           if (segDowner.readNext()){ | ||||
|             packet.FromPointer(segDowner.packetPtr); | ||||
|             segDowner.packetPtr += 188; | ||||
|             tsStream.parse(packet, entId); | ||||
|           } | ||||
|           //}else{
 | ||||
|           //  packet.FromStream(in);
 | ||||
|           //  endOfFile = in.eof();
 | ||||
|           //}
 | ||||
|         } | ||||
|         // get last packets
 | ||||
|         tsStream.finish(); | ||||
|  | @ -735,7 +761,7 @@ namespace Mist{ | |||
|             pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; | ||||
|             pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); | ||||
|             packetId = counter; | ||||
|             INFO_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(), | ||||
|             INFO_MSG("Added file %s, trackid: %zu, mapped to: %d", entryIt->filename.c_str(), | ||||
|                      headerPack.getTrackId(), counter); | ||||
|             counter++; | ||||
|           } | ||||
|  | @ -757,8 +783,6 @@ namespace Mist{ | |||
|           tsStream.getEarliestPacket(headerPack); | ||||
|         } | ||||
| 
 | ||||
|         // if (!pListIt->isUrl()){in.close();}
 | ||||
| 
 | ||||
|         if (hasHeader){break;} | ||||
|       } | ||||
|     } | ||||
|  | @ -781,7 +805,7 @@ namespace Mist{ | |||
|   void inputHLS::getNext(bool smart){ | ||||
|     INSANE_MSG("Getting next"); | ||||
|     uint32_t tid = 0; | ||||
|     static bool endOfFile = false; | ||||
|     bool finished = false; | ||||
|     if (selectedTracks.size()){tid = *selectedTracks.begin();} | ||||
|     thisPacket.null(); | ||||
|     while (config->is_active && (needsLock() || nProxy.userClient.isAlive())){ | ||||
|  | @ -789,7 +813,7 @@ namespace Mist{ | |||
|       // Check if we have a packet
 | ||||
|       bool hasPacket = false; | ||||
|       if (streamIsLive){ | ||||
|         hasPacket = tsStream.hasPacketOnEachTrack() || (endOfFile && tsStream.hasPacket()); | ||||
|         hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket()); | ||||
|       }else{ | ||||
|         hasPacket = tsStream.hasPacket(getMappedTrackId(tid)); | ||||
|       } | ||||
|  | @ -826,7 +850,7 @@ namespace Mist{ | |||
|             int64_t prevOffset = plsTimeOffset[currentPlaylist]; | ||||
|             plsTimeOffset[currentPlaylist] = (nUTC - zUTC) - thisPacket.getTime(); | ||||
|             newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist]; | ||||
|             INFO_MSG("[UTC; New offset: %" PRId64 " -> %" PRId64 "] Packet %lu@%" PRIu64 | ||||
|             INFO_MSG("[UTC; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu32 "@%" PRIu64 | ||||
|                      "ms -> %" PRIu64 "ms", | ||||
|                      prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime); | ||||
|           } | ||||
|  | @ -842,8 +866,8 @@ namespace Mist{ | |||
|                 plsTimeOffset[currentPlaylist] += | ||||
|                     (int64_t)(plsLastTime[currentPlaylist] + plsInterval[currentPlaylist]) - (int64_t)newTime; | ||||
|                 newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist]; | ||||
|                 INFO_MSG("[Guess; New offset: %" PRId64 " -> %" PRId64 "] Packet %lu@%" PRIu64 | ||||
|                          "ms -> %" PRIu64 "ms", | ||||
|                 INFO_MSG("[Guess; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu32 | ||||
|                          "@%" PRIu64 "ms -> %" PRIu64 "ms", | ||||
|                          prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime); | ||||
|               } | ||||
|             } | ||||
|  | @ -866,25 +890,24 @@ namespace Mist{ | |||
|       } | ||||
| 
 | ||||
|       // No? Let's read some more data and check again.
 | ||||
|       if (!segDowner.atEnd()){ | ||||
|       if (!segDowner.atEnd() && segDowner.readNext()){ | ||||
|         tsBuf.FromPointer(segDowner.packetPtr); | ||||
|         segDowner.packetPtr += 188; | ||||
|         tsStream.parse(tsBuf, 0); | ||||
|         tsStream.parse(tsBuf, streamIsLive ? 0 : currentIndex); | ||||
|         continue; // check again
 | ||||
|       } | ||||
| 
 | ||||
|       // Okay, reading more is not possible. Let's call finish() and check again.
 | ||||
|       if (!endOfFile){ | ||||
|         endOfFile = true; // we reached the end of file
 | ||||
|       if (!finished && segDowner.atEnd()){ | ||||
|         tsStream.finish(); | ||||
|         finished = true; | ||||
|         VERYHIGH_MSG("Finishing reading TS segment"); | ||||
|         continue; // Check again!
 | ||||
|       } | ||||
| 
 | ||||
|       // No? Then we try to read the next file.
 | ||||
|       //
 | ||||
|       currentPlaylist = firstSegment(); | ||||
|       // No? Then we want to try reading the next file.
 | ||||
| 
 | ||||
|       // No segments? Wait until next playlist reloading time.
 | ||||
|       currentPlaylist = firstSegment(); | ||||
|       if (currentPlaylist < 0){ | ||||
|         VERYHIGH_MSG("Waiting for segments..."); | ||||
|         if (nProxy.userClient.isAlive()){nProxy.userClient.keepAlive();} | ||||
|  | @ -896,14 +919,16 @@ namespace Mist{ | |||
|       VERYHIGH_MSG("Moving on to next TS segment (variant %" PRIu32 ")", currentPlaylist); | ||||
|       if (readNextFile()){ | ||||
|         MEDIUM_MSG("Next segment read successfully"); | ||||
|         endOfFile = false; // no longer at end of file
 | ||||
|         continue;          // Success! Continue regular parsing.
 | ||||
|         finished = false; | ||||
|         continue; // Success! Continue regular parsing.
 | ||||
|       }else{ | ||||
|         // failed to read segment for playlist, dropping it
 | ||||
|         WARN_MSG("Dropping variant %" PRIu32 " because we couldn't read anything from it", currentPlaylist); | ||||
|         tthread::lock_guard<tthread::mutex> guard(entryMutex); | ||||
|         listEntries.erase(currentPlaylist); | ||||
|         if (listEntries.size()){continue;} | ||||
|         if (selectedTracks.size() > 1){ | ||||
|           // failed to read segment for playlist, dropping it
 | ||||
|           WARN_MSG("Dropping variant %" PRIu32 " because we couldn't read anything from it", currentPlaylist); | ||||
|           tthread::lock_guard<tthread::mutex> guard(entryMutex); | ||||
|           listEntries.erase(currentPlaylist); | ||||
|           if (listEntries.size()){continue;} | ||||
|         } | ||||
|       } | ||||
| 
 | ||||
|       // Nothing works!
 | ||||
|  | @ -914,22 +939,11 @@ namespace Mist{ | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   void inputHLS::readPMT(){ | ||||
|     HIGH_MSG("readPMT()"); | ||||
|     TS::Packet tsBuffer; | ||||
|     const char *tmpPtr = segDowner.segDL.data().data(); | ||||
| 
 | ||||
|     while (!tsStream.hasPacketOnEachTrack() && | ||||
|            (tmpPtr - segDowner.segDL.data().data() + 188 <= segDowner.segDL.data().size())){ | ||||
|       tsBuffer.FromPointer(tmpPtr); | ||||
|       tsStream.parse(tsBuffer, 0); | ||||
|       tmpPtr += 188; | ||||
|     } | ||||
|     tsStream.partialClear(); | ||||
|   } | ||||
| 
 | ||||
|   // Note: bpos is overloaded here for playlist entry!
 | ||||
|   void inputHLS::seek(int seekTime){ | ||||
|     plsTimeOffset.clear(); | ||||
|     plsLastTime.clear(); | ||||
|     plsInterval.clear(); | ||||
|     tsStream.clear(); | ||||
|     int trackId = 0; | ||||
| 
 | ||||
|  | @ -953,17 +967,32 @@ namespace Mist{ | |||
|     } | ||||
| 
 | ||||
|     currentIndex = plistEntry - 1; | ||||
|     INFO_MSG("Current index = %d", currentIndex); | ||||
| 
 | ||||
|     currentPlaylist = getMappedTrackPlaylist(trackId); | ||||
|     INFO_MSG("Seeking to index %d on playlist %d", currentIndex, currentPlaylist); | ||||
| 
 | ||||
|     {// Lock mutex for listEntries
 | ||||
|       tthread::lock_guard<tthread::mutex> guard(entryMutex); | ||||
|       if (!listEntries.count(currentPlaylist)){ | ||||
|         WARN_MSG("Playlist %d not loaded, aborting seek", currentPlaylist); | ||||
|         return; | ||||
|       } | ||||
|       std::deque<playListEntries> &curPlaylist = listEntries[currentPlaylist]; | ||||
|       if (curPlaylist.size() <= currentIndex){ | ||||
|         WARN_MSG("Playlist %d has %zu <= %d entries, aborting seek", currentPlaylist, | ||||
|                  curPlaylist.size(), currentIndex); | ||||
|         return; | ||||
|       } | ||||
|       playListEntries &entry = curPlaylist.at(currentIndex); | ||||
|       segDowner.loadSegment(entry); | ||||
|     } | ||||
|     readPMT(); | ||||
| 
 | ||||
|     HIGH_MSG("readPMT()"); | ||||
|     TS::Packet tsBuffer; | ||||
|     while (!tsStream.hasPacketOnEachTrack() && !segDowner.atEnd()){ | ||||
|       if (!segDowner.readNext()){break;} | ||||
|       tsBuffer.FromPointer(segDowner.packetPtr); | ||||
|       tsStream.parse(tsBuffer, 0); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   int inputHLS::getEntryId(int playlistId, uint64_t bytePos){ | ||||
|  | @ -990,6 +1019,10 @@ namespace Mist{ | |||
|   } | ||||
| 
 | ||||
|   uint32_t inputHLS::getMappedTrackPlaylist(uint64_t id){ | ||||
|     if (!pidMappingR.count(id)){ | ||||
|       FAIL_MSG("No mapping found for track ID %" PRIu64, id); | ||||
|       return 0; | ||||
|     } | ||||
|     static uint64_t lastIn = id; | ||||
|     static uint32_t lastOut = (pidMappingR[id] >> 32); | ||||
|     if (lastIn != id){ | ||||
|  | @ -1171,8 +1204,33 @@ namespace Mist{ | |||
|         WARN_MSG("no entries found in playlist: %d!", currentPlaylist); | ||||
|         return false; | ||||
|       } | ||||
|       ntry = curList.front(); | ||||
|       curList.pop_front(); | ||||
|       if (!streamIsLive){ | ||||
|         // VoD advances the index by one and attempts to read
 | ||||
|         // The playlist is not altered in this case, since we may need to seek back later
 | ||||
|         currentIndex++; | ||||
|         if (curList.size() - 1 < currentIndex){ | ||||
|           INFO_MSG("Reached last entry"); | ||||
|           return false; | ||||
|         } | ||||
|         ntry = curList[currentIndex]; | ||||
|       }else{ | ||||
|         // Live does not use the currentIndex, but simply takes the first segment
 | ||||
|         // That segment is then removed from the playlist so we don't read it again - live streams can't seek anyway
 | ||||
|         if (!curList.size()){ | ||||
|           INFO_MSG("Reached last entry"); | ||||
|           return false; | ||||
|         } | ||||
|         ntry = *curList.begin(); | ||||
|         curList.pop_front(); | ||||
| 
 | ||||
|         if (Util::bootSecs() < ntry.timestamp){ | ||||
|           VERYHIGH_MSG("Slowing down to realtime..."); | ||||
|           while (Util::bootSecs() < ntry.timestamp){ | ||||
|             if (nProxy.userClient.isAlive()){nProxy.userClient.keepAlive();} | ||||
|             Util::wait(250); | ||||
|           } | ||||
|         } | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     if (!segDowner.loadSegment(ntry)){ | ||||
|  |  | |||
|  | @ -9,8 +9,8 @@ | |||
| #include <string> | ||||
| #include <vector> | ||||
| //#include <stdint.h>
 | ||||
| #include <mist/downloader.h> | ||||
| #include <mist/http_parser.h> | ||||
| #include <mist/urireader.h> | ||||
| 
 | ||||
| #define BUFFERTIME 10 | ||||
| 
 | ||||
|  | @ -39,10 +39,19 @@ namespace Mist{ | |||
|   class SegmentDownloader{ | ||||
|   public: | ||||
|     SegmentDownloader(); | ||||
|     HTTP::Downloader segDL; | ||||
|     const char *packetPtr; | ||||
|     HTTP::URIReader segDL; | ||||
|     char *packetPtr; | ||||
|     bool loadSegment(const playListEntries &entry); | ||||
|     bool readNext(); | ||||
|     void close(); | ||||
|     bool atEnd() const; | ||||
| 
 | ||||
|   private: | ||||
|     bool encrypted; | ||||
|     Util::ResizeablePointer outData; | ||||
|     size_t encOffset; | ||||
|     unsigned char tmpIvec[16]; | ||||
|     mbedtls_aes_context aes; | ||||
|   }; | ||||
| 
 | ||||
|   class Playlist{ | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Thulinma
						Thulinma