#include "io.h" #include #include #include #include #include #include #include //LTS #include #include #include namespace Mist{ InOutBase::InOutBase() : M(meta){} /// Returns the ID of the main selected track, or 0 if no tracks are selected. /// The main track is the first video track, if any, and otherwise the first other track. /// Returns INVALID_TRACK_ID if there are no valid selected tracks. /// Refreshes the metadata to make sure we don't return unloaded tracks. size_t InOutBase::getMainSelectedTrack(){ if (!userSelect.size()){return INVALID_TRACK_ID;} size_t bestSoFar = INVALID_TRACK_ID; meta.refresh(); for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ if (meta.trackValid(it->first)){ if (meta.getType(it->first) == "video"){return it->first;} bestSoFar = it->first; } } return bestSoFar; } /// Starts the buffering of a new page. /// /// Does not do any actual buffering, just sets the right bits for buffering to go right. /// /// Buffering itself is done by bufferNext(). ///\param tid The trackid of the page to start buffering ///\param pageNumber The number of the page to start buffering bool InOutBase::bufferStart(size_t idx, size_t pageNumber){ VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %zu", streamName.c_str(), idx, pageNumber); // Initialize the stream metadata if it does not yet exist #ifndef TSLIVE_INPUT if (!meta){meta.reInit(streamName);} #endif if (!meta.getValidTracks().size()){ meta.clear(); return false; } // If we are currently buffering a page, abandon it completely and print a message about this // This page will NEVER be deleted, unless we open it again later. if (curPage.count(idx)){ WARN_MSG("Abandoning current page (%zu) for track %zu", curPageNum[idx], idx); curPage.erase(idx); curPageNum.erase(idx); } Util::RelAccX &tPages = meta.pages(idx); size_t pageIdx = INVALID_PAGE_NUM; for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ if (tPages.getInt("firstkey", i) == pageNumber){ pageIdx = i; break; } } // If this is not a valid page number on this track, stop buffering this page. if (pageIdx == INVALID_PAGE_NUM){ WARN_MSG("Aborting page buffer start: %zu is not a valid page number on track %zu.", pageNumber, idx); std::stringstream test; for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ test << tPages.getInt("firstkey", i) << " "; } INFO_MSG("Valid page numbers: %s", test.str().c_str()); ///\return false if the pagenumber is not valid for this track return false; } // If the page is already buffered, ignore this request if (isBuffered(idx, pageNumber)){ INFO_MSG("Page %zu on track %zu already buffered", pageNumber, idx); ///\return false if the page was already buffered. return false; } // Open the correct page for the data char pageId[NAME_BUFFER_SIZE]; snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, pageNumber); uint64_t pageSize = tPages.getInt("size", pageIdx); std::string pageName(pageId); curPage[idx].init(pageName, pageSize, true); // Make sure the data page is not destroyed when we are done buffering it later on. curPage[idx].master = false; // Store the pagenumber of the currently buffer page curPageNum[idx] = pageNumber; // Set the current offset to 0, to allow for using it in bufferNext() tPages.setInt("avail", 0, pageIdx); HIGH_MSG("Start buffering page %zu on track %zu successful", pageNumber, idx); return true; } /// Removes a fully buffered page /// /// Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function. ///\param tid The trackid to remove the page from ///\param pageNumber The number of the page to remove void InOutBase::bufferRemove(size_t idx, size_t pageNumber){ if (!standAlone){// A different process will handle this for us return; } Util::RelAccX &tPages = meta.pages(idx); size_t pageIdx = INVALID_PAGE_NUM; for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ if (tPages.getInt("firstkey", i) == pageNumber){ pageIdx = i; break; } } // If the given pagenumber is not a valid page on this track, do nothing if (pageIdx == INVALID_PAGE_NUM){ INFO_MSG("Can't remove page %zu on track %zu as it is not a valid page number.", pageNumber, idx); return; } HIGH_MSG("Removing page %zu on track %zu from the corresponding metaPage", pageNumber, idx); tPages.setInt("avail", 0, pageIdx); // Open the correct page char pageId[NAME_BUFFER_SIZE]; snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, pageNumber); std::string pageName(pageId); IPC::sharedPage toErase; #ifdef __CYGWIN__ toErase.init(pageName, 26 * 1024 * 1024, false, false); #else toErase.init(pageName, tPages.getInt("size", pageIdx), false, false); #endif // Set the master flag so that the page will be destroyed once it leaves scope #if defined(__CYGWIN__) || defined(_WIN32) IPC::releasePage(pageName); #endif toErase.master = true; // Remove the page from the tracks index page // Leaving scope here, the page will now be destroyed } /// Checks whether a key is buffered ///\param tid The trackid on which to locate the key ///\param keyNum The number of the keyframe to find bool InOutBase::isBuffered(size_t idx, uint32_t keyNum){ ///\return The result of bufferedOnPage(tid, keyNum) return bufferedOnPage(idx, keyNum) != INVALID_KEY_NUM; } /// Returns the pagenumber where this key is buffered on ///\param tid The trackid on which to locate the key ///\param keyNum The number of the keyframe to find size_t InOutBase::bufferedOnPage(size_t idx, size_t keyNum){ Util::RelAccX &tPages = meta.pages(idx); for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ uint64_t pageNum = tPages.getInt("firstkey", i); if (pageNum > keyNum) break; uint64_t keyCount = tPages.getInt("keycount", i); if (pageNum + keyCount - 1 < keyNum) continue; uint64_t avail = tPages.getInt("avail", i); return avail ? pageNum : INVALID_KEY_NUM; } return INVALID_KEY_NUM; } /// Buffers the next packet on the currently opened page ///\param pack The packet to buffer void InOutBase::bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe){ size_t packDataLen = 24 + (packOffset ? 17 : 0) + (packBytePos ? 15 : 0) + (isKeyframe ? 19 : 0) + packDataSize + 11; static bool multiWrong = false; // Save the trackid of the track for easier access if (packTrack == INVALID_TRACK_ID){ WARN_MSG("Packet with id %" PRIu32 " has an invalid track", packTrack); return; } // these checks were already done in bufferSinglePacket, but we check again just to be sure if (meta.getLive() && packTime < meta.getLastms(packTrack)){ DEBUG_MSG(((multiWrong == 0) ? DLVL_WARN : DLVL_HIGH), "Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack, packTime, meta.getLastms(packTrack)); multiWrong = true; return; } // Do nothing if no page is opened for this track if (!curPage.count(packTrack)){ INFO_MSG("Trying to buffer a packet on track %" PRIu32 ", but no page is initialized", packTrack); return; } multiWrong = false; IPC::sharedPage &myPage = curPage[packTrack]; Util::RelAccX &tPages = meta.pages(packTrack); size_t pageIdx = 0; size_t currPagNum = curPageNum[packTrack]; Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey"); for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ if (tPages.getInt(firstkey, i) == currPagNum){ pageIdx = i; break; } } // Save the current write position uint64_t pageOffset = tPages.getInt("avail", pageIdx); uint64_t pageSize = tPages.getInt("size", pageIdx); // Do nothing when there is not enough free space on the page to add the packet. if (pageSize - pageOffset < packDataLen){ FAIL_MSG("Track %" PRIu32 "p%zu : Pack %" PRIu64 "ms of %" PRIu64 "b exceeds size %" PRIu64 " @ bpos %" PRIu64, packTrack, currPagNum, packTime, packDataLen, pageSize, pageOffset); return; } // First generate only the payload on the correct destination // Leaves the 20 bytes inbetween empty to ensure the data is not accidentally read before it is // complete char *data = myPage.mapped + pageOffset; data[20] = 0xE0; // start container object unsigned int offset = 21; if (packOffset){ memcpy(data + offset, "\000\006offset\001", 9); Bit::htobll(data + offset + 9, packOffset); offset += 17; } if (packBytePos){ memcpy(data + offset, "\000\004bpos\001", 7); Bit::htobll(data + offset + 7, packBytePos); offset += 15; } if (isKeyframe){ memcpy(data + offset, "\000\010keyframe\001\000\000\000\000\000\000\000\001", 19); offset += 19; } memcpy(data + offset, "\000\004data\002", 7); Bit::htobl(data + offset + 7, packDataSize); memcpy(data + offset + 11, packData ? packData : 0, packDataSize); // finish container with 0x0000EE memcpy(data + offset + 11 + packDataSize, "\000\000\356", 3); // Copy the remaining values in reverse order: // 8 byte timestamp Bit::htobll(myPage.mapped + pageOffset + 12, packTime); // The mapped track id Bit::htobl(myPage.mapped + pageOffset + 8, packTrack); // Write the size Bit::htobl(myPage.mapped + pageOffset + 4, packDataLen - 8); // write the 'DTP2' bytes to conclude the packet and allow for reading it memcpy(myPage.mapped + pageOffset, "DTP2", 4); if (M.getLive()){ meta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe); } tPages.setInt("avail", pageOffset + packDataLen, pageIdx); } /// Wraps up the buffering of a shared memory data page /// /// Registers the data page on the track index page as well ///\param tid The trackid of the page to finalize void InOutBase::bufferFinalize(size_t idx){ // If no page is open, do nothing if (!curPage.count(idx)){ INFO_MSG("Trying to finalize the current page on track %zu, but no page is initialized", idx); return; } /// \TODO META Re-Implement for Cygwin/Win32! #if defined(__CYGWIN__) || defined(_WIN32) static int wipedAlready = 0; if (lowest && lowest > wipedAlready + 1){ for (int curr = wipedAlready + 1; curr < lowest; ++curr){ char pageId[NAME_BUFFER_SIZE]; snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, curr); IPC::releasePage(std::string(pageId)); } } // Print a message about registering the page or not. if (inserted){IPC::preservePage(curPage[idx].name);} #endif // Close our link to the page. This will NOT destroy the shared page, as we've set master to // false upon construction Note: if there was a registering failure above, this WILL destroy the // shared page, to prevent a memory leak curPage.erase(idx); curPageNum.erase(idx); } /// Buffers a live packet to a page. /// /// Handles both buffering and creation of new pages /// /// Initiates/continues negotiation with the buffer as well ///\param packet The packet to buffer void InOutBase::bufferLivePacket(const DTSC::Packet &packet){ size_t idx = M.trackIDToIndex(packet.getTrackId(), getpid()); if (idx == INVALID_TRACK_ID){ INFO_MSG("Packet for track %zu has no valid index!", packet.getTrackId()); return; } char *data; size_t dataLen; packet.getString("data", data, dataLen); bufferLivePacket(packet.getTime(), packet.getInt("offset"), idx, data, dataLen, packet.getInt("bpos"), packet.getFlag("keyframe")); /// \TODO META Build something that should actually be able to deal with "extra" values } void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe){ meta.refresh(); meta.setLive(); // Store the trackid for easier access // Do nothing if the trackid is invalid if (packTrack == INVALID_TRACK_ID){return;} // Store the trackid for easier access Util::RelAccX &tPages = meta.pages(packTrack); if (M.getType(packTrack) != "video"){ isKeyframe = false; if (!tPages.getEndPos()){ // Assume this is the first packet on the track isKeyframe = true; }else{ if (packTime - tPages.getInt("lastkeytime", tPages.getEndPos() - 1) >= AUDIO_KEY_INTERVAL){ isKeyframe = true; } } } // For live streams, ignore packets that make no sense // This also happens in bufferNext, with the same rules if (M.getLive()){ if (packTime < M.getLastms(packTrack)){ HIGH_MSG("Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack, packTime, M.getLastms(packTrack)); return; } if (packTime > M.getLastms(packTrack) + 30000 && M.getLastms(packTrack)){ WARN_MSG("Sudden jump in timestamp from %" PRIu64 " to %" PRIu64, M.getLastms(packTrack), packTime); } } // Determine if we need to open the next page uint32_t nextPageNum = INVALID_KEY_NUM; if (isKeyframe){ updateTrackFromKeyframe(packTrack, packData, packDataSize); uint64_t endPage = tPages.getEndPos(); // If there is no page, create it if (!endPage){ nextPageNum = 0; tPages.addRecords(1); tPages.setInt("firstkey", 0, 0); tPages.setInt("firsttime", packTime, 0); tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, 0); tPages.setInt("keycount", 0, 0); tPages.setInt("avail", 0, 0); ++endPage; } uint64_t prevPageTime = tPages.getInt("firsttime", endPage - 1); // Compare on 8 mb boundary and target duration if (tPages.getInt("avail", endPage - 1) > FLIP_DATA_PAGE_SIZE || packTime - prevPageTime > FLIP_TARGET_DURATION){ // Create the book keeping data for the new page nextPageNum = tPages.getInt("firstkey", endPage - 1) + tPages.getInt("keycount", endPage - 1); HIGH_MSG("Live page transition from %" PRIu32 ":%zu to %" PRIu32 ":%" PRIu32, packTrack, tPages.getInt("firstkey", endPage - 1), packTrack, nextPageNum); tPages.addRecords(1); tPages.setInt("firstkey", nextPageNum, endPage); tPages.setInt("firsttime", packTime, endPage); tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage); tPages.setInt("keycount", 0, endPage); tPages.setInt("avail", 0, endPage); ++endPage; } tPages.setInt("lastkeytime", packTime, endPage - 1); tPages.setInt("keycount", tPages.getInt("keycount", endPage - 1) + 1, endPage - 1); } // Set the pageNumber if it has not been set yet if (nextPageNum == INVALID_KEY_NUM){ if (curPageNum.count(packTrack)){ nextPageNum = curPageNum[packTrack]; }else{ nextPageNum = 0; } } // If we have no pages by track, we have not received a starting keyframe yet. Drop this packet. if (!tPages.getEndPos()){ INFO_MSG("Track %" PRIu32 " not starting with a keyframe!", packTrack); return; } if (curPage.count(packTrack) && !curPage[packTrack].exists()){ WARN_MSG("Data page was deleted - forcing source shutdown to prevent unstable state"); Util::logExitReason("data page was deleted, forcing shutdown to prevent unstable state"); bufferFinalize(packTrack); kill(getpid(), SIGINT); return; } if (!curPageNum.count(packTrack) || nextPageNum != curPageNum[packTrack]){ if (curPageNum.count(packTrack)){ // Close the currently opened page when it exists bufferFinalize(packTrack); } // Open the new page if (!bufferStart(packTrack, nextPageNum)){ // if this fails, return instantly without actually buffering the packet WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); return; } } // Buffer the packet bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe); } ///Handles updating track metadata from a new keyframe, if applicable void InOutBase::updateTrackFromKeyframe(uint32_t packTrack, const char *packData, size_t packDataSize){ if (meta.getCodec(packTrack) == "H264"){ //H264 packets are 4-byte size-prepended NAL units size_t offset = 0; while (offset+4 < packDataSize){ uint32_t nalLen = Bit::btohl(packData+offset); if (nalLen+offset+4 > packDataSize){ FAIL_MSG("Corrupt H264 keyframe packet: NAL unit of size %" PRIu32 " at position %zu exceeds packet size of %zu", nalLen, offset, packDataSize); return; } uint8_t nalType = (packData[offset+4] & 0x1F); if (nalType == 7){//SPS, update width/height/FPS h264::SPSMeta hMeta = h264::sequenceParameterSet(packData+offset+4, nalLen).getCharacteristics(); meta.setWidth(packTrack, hMeta.width); meta.setHeight(packTrack, hMeta.height); meta.setFpks(packTrack, hMeta.fps*1000); } offset += nalLen+4; } } if (meta.getCodec(packTrack) == "VP8"){ //VP8 packets have a simple header for keyframes //Reference: https://www.rfc-editor.org/rfc/rfc6386.html#section-9.1 if (packData[3] == 0x9d && packData[4] == 0x01 && packData[5] == 0x2a){ //Probably a valid key frame uint16_t pixWidth = Bit::btohs_le(packData+6); uint16_t pixHeight = Bit::btohs_le(packData+8); uint32_t w = pixWidth & 0x3fff; uint32_t h = pixHeight & 0x3fff; switch (pixWidth >> 14){ case 1: w *= 5/4; break; case 2: w *= 5/3; break; case 3: w *= 2; break; } switch (pixHeight >> 14){ case 1: h *= 5/4; break; case 2: h *= 5/3; break; case 3: h *= 2; break; } meta.setWidth(packTrack, w); meta.setHeight(packTrack, h); } } } }// namespace Mist