mistserver/src/io.cpp
Thulinma 0af992d405 Various fixes, among which:
- Fixed segfault when attempting to initialseek on disconnected streams
- Fix 100% CPU bug in controller's stats code
- WebRTC UDP bind socket improvements
- Several segfault fixes
- Increased packet reordering buffer size from 30 to 150 packets
- Tweaks to default output/buffer behaviour for incoming pushes
- Added message for load balancer checks
- Fixed HLS content type
- Stats fixes
- Exit reason fixes
- Fixed socket IP address detection
- Fixed non-string arguments for stream settings
- Added caching for getConnectedBinHost()
- Added WebRTC playback rate control
- Added/completed VP8/VP9 support to WebRTC/RTSP
- Added live seek option to WebRTC
- Fixed seek to exactly newest timestamp
- Fixed HLS input

# Conflicts:
#	lib/defines.h
#	src/input/input.cpp
2021-10-19 22:29:40 +02:00

421 lines
16 KiB
C++

#include "io.h"
#include <cstdlib>
#include <mist/auth.h>
#include <mist/bitfields.h>
#include <mist/encode.h>
#include <mist/http_parser.h>
#include <mist/json.h>
#include <mist/langcodes.h> //LTS
#include <mist/stream.h>
namespace Mist{
InOutBase::InOutBase() : M(meta){}
/// Returns the ID of the main selected track, or 0 if no tracks are selected.
/// The main track is the first video track, if any, and otherwise the first other track.
/// Returns INVALID_TRACK_ID if there are no valid selected tracks.
/// Refreshes the metadata to make sure we don't return unloaded tracks.
size_t InOutBase::getMainSelectedTrack(){
if (!userSelect.size()){return INVALID_TRACK_ID;}
size_t bestSoFar = INVALID_TRACK_ID;
meta.refresh();
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (meta.trackValid(it->first)){
if (meta.getType(it->first) == "video"){return it->first;}
bestSoFar = it->first;
}
}
return bestSoFar;
}
/// Starts the buffering of a new page.
///
/// Does not do any actual buffering, just sets the right bits for buffering to go right.
///
/// Buffering itself is done by bufferNext().
///\param tid The trackid of the page to start buffering
///\param pageNumber The number of the page to start buffering
bool InOutBase::bufferStart(size_t idx, size_t pageNumber){
VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %zu", streamName.c_str(), idx, pageNumber);
// Initialize the stream metadata if it does not yet exist
#ifndef TSLIVE_INPUT
if (!meta){meta.reInit(streamName);}
#endif
if (!meta.getValidTracks().size()){
meta.clear();
return false;
}
// If we are currently buffering a page, abandon it completely and print a message about this
// This page will NEVER be deleted, unless we open it again later.
if (curPage.count(idx)){
WARN_MSG("Abandoning current page (%zu) for track %zu", curPageNum[idx], idx);
curPage.erase(idx);
curPageNum.erase(idx);
}
Util::RelAccX &tPages = meta.pages(idx);
size_t pageIdx = INVALID_PAGE_NUM;
for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
if (tPages.getInt("firstkey", i) == pageNumber){
pageIdx = i;
break;
}
}
// If this is not a valid page number on this track, stop buffering this page.
if (pageIdx == INVALID_PAGE_NUM){
WARN_MSG("Aborting page buffer start: %zu is not a valid page number on track %zu.", pageNumber, idx);
std::stringstream test;
for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
test << tPages.getInt("firstkey", i) << " ";
}
INFO_MSG("Valid page numbers: %s", test.str().c_str());
///\return false if the pagenumber is not valid for this track
return false;
}
// If the page is already buffered, ignore this request
if (isBuffered(idx, pageNumber)){
INFO_MSG("Page %zu on track %zu already buffered", pageNumber, idx);
///\return false if the page was already buffered.
return false;
}
// Open the correct page for the data
char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, pageNumber);
uint64_t pageSize = tPages.getInt("size", pageIdx);
std::string pageName(pageId);
curPage[idx].init(pageName, pageSize, true);
// Make sure the data page is not destroyed when we are done buffering it later on.
curPage[idx].master = false;
// Store the pagenumber of the currently buffer page
curPageNum[idx] = pageNumber;
// Set the current offset to 0, to allow for using it in bufferNext()
tPages.setInt("avail", 0, pageIdx);
HIGH_MSG("Start buffering page %zu on track %zu successful", pageNumber, idx);
return true;
}
/// Removes a fully buffered page
///
/// Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function.
///\param tid The trackid to remove the page from
///\param pageNumber The number of the page to remove
void InOutBase::bufferRemove(size_t idx, size_t pageNumber){
if (!standAlone){// A different process will handle this for us
return;
}
Util::RelAccX &tPages = meta.pages(idx);
size_t pageIdx = INVALID_PAGE_NUM;
for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
if (tPages.getInt("firstkey", i) == pageNumber){
pageIdx = i;
break;
}
}
// If the given pagenumber is not a valid page on this track, do nothing
if (pageIdx == INVALID_PAGE_NUM){
INFO_MSG("Can't remove page %zu on track %zu as it is not a valid page number.", pageNumber, idx);
return;
}
HIGH_MSG("Removing page %zu on track %zu from the corresponding metaPage", pageNumber, idx);
tPages.setInt("avail", 0, pageIdx);
// Open the correct page
char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, pageNumber);
std::string pageName(pageId);
IPC::sharedPage toErase;
#ifdef __CYGWIN__
toErase.init(pageName, 26 * 1024 * 1024, false);
#else
toErase.init(pageName, tPages.getInt("size", pageIdx), false);
#endif
// Set the master flag so that the page will be destroyed once it leaves scope
#if defined(__CYGWIN__) || defined(_WIN32)
IPC::releasePage(pageName);
#endif
toErase.master = true;
// Remove the page from the tracks index page
// Leaving scope here, the page will now be destroyed
}
/// Checks whether a key is buffered
///\param tid The trackid on which to locate the key
///\param keyNum The number of the keyframe to find
bool InOutBase::isBuffered(size_t idx, uint32_t keyNum){
///\return The result of bufferedOnPage(tid, keyNum)
return bufferedOnPage(idx, keyNum) != INVALID_KEY_NUM;
}
/// Returns the pagenumber where this key is buffered on
///\param tid The trackid on which to locate the key
///\param keyNum The number of the keyframe to find
size_t InOutBase::bufferedOnPage(size_t idx, size_t keyNum){
Util::RelAccX &tPages = meta.pages(idx);
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
uint64_t pageNum = tPages.getInt("firstkey", i);
if (pageNum > keyNum) break;
uint64_t keyCount = tPages.getInt("keycount", i);
if (pageNum + keyCount - 1 < keyNum) continue;
uint64_t avail = tPages.getInt("avail", i);
return avail ? pageNum : INVALID_KEY_NUM;
}
return INVALID_KEY_NUM;
}
/// Buffers the next packet on the currently opened page
///\param pack The packet to buffer
void InOutBase::bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe){
size_t packDataLen =
24 + (packOffset ? 17 : 0) + (packBytePos ? 15 : 0) + (isKeyframe ? 19 : 0) + packDataSize + 11;
static bool multiWrong = false;
// Save the trackid of the track for easier access
if (packTrack == INVALID_TRACK_ID){
WARN_MSG("Packet with id %" PRIu32 " has an invalid track", packTrack);
return;
}
// these checks were already done in bufferSinglePacket, but we check again just to be sure
if (meta.getLive() && packTime < meta.getLastms(packTrack)){
DEBUG_MSG(((multiWrong == 0) ? DLVL_WARN : DLVL_HIGH),
"Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack,
packTime, meta.getLastms(packTrack));
multiWrong = true;
return;
}
// Do nothing if no page is opened for this track
if (!curPage.count(packTrack)){
INFO_MSG("Trying to buffer a packet on track %" PRIu32 ", but no page is initialized", packTrack);
return;
}
multiWrong = false;
IPC::sharedPage &myPage = curPage[packTrack];
Util::RelAccX &tPages = meta.pages(packTrack);
size_t pageIdx = 0;
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
if (tPages.getInt("firstkey", i) == curPageNum[packTrack]){
pageIdx = i;
break;
}
}
// Save the current write position
uint64_t pageOffset = tPages.getInt("avail", pageIdx);
uint64_t pageSize = tPages.getInt("size", pageIdx);
// Do nothing when there is not enough free space on the page to add the packet.
if (pageSize - pageOffset < packDataLen){
FAIL_MSG("Track %" PRIu32 "p%zu : Pack %" PRIu64 "ms of %" PRIu64 "b exceeds size %" PRIu64 " @ bpos %" PRIu64,
packTrack, curPageNum[packTrack], packTime, packDataLen, pageSize, pageOffset);
return;
}
// First generate only the payload on the correct destination
// Leaves the 20 bytes inbetween empty to ensure the data is not accidentally read before it is
// complete
char *data = myPage.mapped + pageOffset;
data[20] = 0xE0; // start container object
unsigned int offset = 21;
if (packOffset){
memcpy(data + offset, "\000\006offset\001", 9);
Bit::htobll(data + offset + 9, packOffset);
offset += 17;
}
if (packBytePos){
memcpy(data + offset, "\000\004bpos\001", 7);
Bit::htobll(data + offset + 7, packBytePos);
offset += 15;
}
if (isKeyframe){
memcpy(data + offset, "\000\010keyframe\001\000\000\000\000\000\000\000\001", 19);
offset += 19;
}
memcpy(data + offset, "\000\004data\002", 7);
Bit::htobl(data + offset + 7, packDataSize);
memcpy(data + offset + 11, packData ? packData : 0, packDataSize);
// finish container with 0x0000EE
memcpy(data + offset + 11 + packDataSize, "\000\000\356", 3);
// Copy the remaining values in reverse order:
// 8 byte timestamp
Bit::htobll(myPage.mapped + pageOffset + 12, packTime);
// The mapped track id
Bit::htobl(myPage.mapped + pageOffset + 8, packTrack);
// Write the size
Bit::htobl(myPage.mapped + pageOffset + 4, packDataLen - 8);
// write the 'DTP2' bytes to conclude the packet and allow for reading it
memcpy(myPage.mapped + pageOffset, "DTP2", 4);
if (M.getLive()){
meta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe);
}
tPages.setInt("avail", pageOffset + packDataLen, pageIdx);
}
/// Wraps up the buffering of a shared memory data page
///
/// Registers the data page on the track index page as well
///\param tid The trackid of the page to finalize
void InOutBase::bufferFinalize(size_t idx){
// If no page is open, do nothing
if (!curPage.count(idx)){
INFO_MSG("Trying to finalize the current page on track %zu, but no page is initialized", idx);
return;
}
/// \TODO META Re-Implement for Cygwin/Win32!
#if defined(__CYGWIN__) || defined(_WIN32)
static int wipedAlready = 0;
if (lowest && lowest > wipedAlready + 1){
for (int curr = wipedAlready + 1; curr < lowest; ++curr){
char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), idx, curr);
IPC::releasePage(std::string(pageId));
}
}
// Print a message about registering the page or not.
if (inserted){IPC::preservePage(curPage[idx].name);}
#endif
// Close our link to the page. This will NOT destroy the shared page, as we've set master to
// false upon construction Note: if there was a registering failure above, this WILL destroy the
// shared page, to prevent a memory leak
curPage.erase(idx);
curPageNum.erase(idx);
}
/// Buffers a live packet to a page.
///
/// Handles both buffering and creation of new pages
///
/// Initiates/continues negotiation with the buffer as well
///\param packet The packet to buffer
void InOutBase::bufferLivePacket(const DTSC::Packet &packet){
size_t idx = M.trackIDToIndex(packet.getTrackId(), getpid());
if (idx == INVALID_TRACK_ID){
INFO_MSG("Packet for track %zu has no valid index!", packet.getTrackId());
return;
}
char *data;
size_t dataLen;
packet.getString("data", data, dataLen);
bufferLivePacket(packet.getTime(), packet.getInt("offset"), idx, data, dataLen,
packet.getInt("bpos"), packet.getFlag("keyframe"));
/// \TODO META Build something that should actually be able to deal with "extra" values
}
void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe){
meta.refresh();
meta.setLive();
// Store the trackid for easier access
// Do nothing if the trackid is invalid
if (packTrack == INVALID_TRACK_ID){return;}
// Store the trackid for easier access
Util::RelAccX &tPages = meta.pages(packTrack);
if (M.getType(packTrack) != "video"){
isKeyframe = false;
if (!tPages.getEndPos()){
// Assume this is the first packet on the track
isKeyframe = true;
}else{
if (packTime - tPages.getInt("lastkeytime", tPages.getEndPos() - 1) >= AUDIO_KEY_INTERVAL){
isKeyframe = true;
}
}
}
// For live streams, ignore packets that make no sense
// This also happens in bufferNext, with the same rules
if (M.getLive()){
if (packTime < M.getLastms(packTrack)){
HIGH_MSG("Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack,
packTime, M.getLastms(packTrack));
return;
}
if (packTime > M.getLastms(packTrack) + 30000 && M.getLastms(packTrack)){
WARN_MSG("Sudden jump in timestamp from %" PRIu64 " to %" PRIu64, M.getLastms(packTrack), packTime);
}
}
// Determine if we need to open the next page
uint32_t nextPageNum = INVALID_KEY_NUM;
if (isKeyframe){
uint64_t endPage = tPages.getEndPos();
// If there is no page, create it
if (!endPage){
nextPageNum = 0;
tPages.addRecords(1);
tPages.setInt("firstkey", 0, 0);
tPages.setInt("firsttime", packTime, 0);
tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, 0);
tPages.setInt("keycount", 0, 0);
tPages.setInt("avail", 0, 0);
++endPage;
}
uint64_t prevPageTime = tPages.getInt("firsttime", endPage - 1);
// Compare on 8 mb boundary and target duration
if (tPages.getInt("avail", endPage - 1) > FLIP_DATA_PAGE_SIZE || packTime - prevPageTime > FLIP_TARGET_DURATION){
// Create the book keeping data for the new page
nextPageNum = tPages.getInt("firstkey", endPage - 1) + tPages.getInt("keycount", endPage - 1);
HIGH_MSG("Live page transition from %" PRIu32 ":%zu to %" PRIu32 ":%" PRIu32, packTrack,
tPages.getInt("firstkey", endPage - 1), packTrack, nextPageNum);
tPages.addRecords(1);
tPages.setInt("firstkey", nextPageNum, endPage);
tPages.setInt("firsttime", packTime, endPage);
tPages.setInt("size", DEFAULT_DATA_PAGE_SIZE, endPage);
tPages.setInt("keycount", 0, endPage);
tPages.setInt("avail", 0, endPage);
++endPage;
}
tPages.setInt("lastkeytime", packTime, endPage - 1);
tPages.setInt("keycount", tPages.getInt("keycount", endPage - 1) + 1, endPage - 1);
}
// Set the pageNumber if it has not been set yet
if (nextPageNum == INVALID_KEY_NUM){
if (curPageNum.count(packTrack)){
nextPageNum = curPageNum[packTrack];
}else{
nextPageNum = 0;
}
}
// If we have no pages by track, we have not received a starting keyframe yet. Drop this packet.
if (!tPages.getEndPos()){
INFO_MSG("Track %" PRIu32 " not starting with a keyframe!", packTrack);
return;
}
if (!curPageNum.count(packTrack) || nextPageNum != curPageNum[packTrack]){
if (curPageNum.count(packTrack)){
// Close the currently opened page when it exists
bufferFinalize(packTrack);
}
// Open the new page
if (!bufferStart(packTrack, nextPageNum)){
// if this fails, return instantly without actually buffering the packet
WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime);
return;
}
}
// Buffer the packet
bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe);
}
}// namespace Mist