1871 lines
72 KiB
C++
1871 lines
72 KiB
C++
#include <algorithm>
|
|
#include <fcntl.h>
|
|
#include <iterator> //std::distance
|
|
#include <semaphore.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/types.h>
|
|
#include <sys/wait.h>
|
|
#include <unistd.h>
|
|
|
|
#include "output.h"
|
|
#include <mist/bitfields.h>
|
|
#include <mist/defines.h>
|
|
#include <mist/h264.h>
|
|
#include <mist/http_parser.h>
|
|
#include <mist/stream.h>
|
|
#include <mist/timing.h>
|
|
#include <mist/util.h>
|
|
|
|
/*LTS-START*/
|
|
#include <arpa/inet.h>
|
|
#include <mist/langcodes.h>
|
|
#include <mist/triggers.h>
|
|
#include <netdb.h>
|
|
#include <sys/socket.h>
|
|
#include <sys/stat.h>
|
|
/*LTS-END*/
|
|
|
|
namespace Mist{
|
|
JSON::Value Output::capa = JSON::Value();
|
|
Util::Config *Output::config = NULL;
|
|
|
|
uint32_t getDTSCLen(char *mapped, uint64_t offset){return Bit::btohl(mapped + offset + 4);}
|
|
|
|
uint64_t getDTSCTime(char *mapped, uint64_t offset){return Bit::btohll(mapped + offset + 12);}
|
|
|
|
void Output::init(Util::Config *cfg){
|
|
capa["optional"]["debug"]["name"] = "debug";
|
|
capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed.";
|
|
capa["optional"]["debug"]["option"] = "--debug";
|
|
capa["optional"]["debug"]["type"] = "debug";
|
|
|
|
JSON::Value option;
|
|
option["long"] = "noinput";
|
|
option["short"] = "N";
|
|
option["help"] = "Do not start input if not already started";
|
|
option["value"].append(0);
|
|
cfg->addOption("noinput", option);
|
|
config = cfg;
|
|
}
|
|
|
|
Output::Output(Socket::Connection &conn) : myConn(conn){
|
|
pushing = false;
|
|
firstTime = 0;
|
|
firstPacketTime = 0xFFFFFFFFFFFFFFFFull;
|
|
lastPacketTime = 0;
|
|
crc = getpid();
|
|
parseData = false;
|
|
wantRequest = true;
|
|
sought = false;
|
|
isInitialized = false;
|
|
isBlocking = false;
|
|
needsLookAhead = 0;
|
|
extraKeepAway = 0;
|
|
lastStats = 0;
|
|
maxSkipAhead = 7500;
|
|
uaDelay = 10;
|
|
realTime = 1000;
|
|
lastRecv = Util::bootSecs();
|
|
if (myConn){
|
|
setBlocking(true);
|
|
//Make sure that if the socket is a non-stdio socket, we close it when forking
|
|
if (myConn.getSocket() > 2){
|
|
Util::Procs::socketList.insert(myConn.getSocket());
|
|
}
|
|
}else{
|
|
WARN_MSG("Warning: MistOut created with closed socket!");
|
|
}
|
|
sentHeader = false;
|
|
isRecordingToFile = false;
|
|
|
|
// If we have a streamname option, set internal streamname to that option
|
|
if (!streamName.size() && config->hasOption("streamname")){
|
|
streamName = config->getString("streamname");
|
|
Util::streamName = streamName;
|
|
}
|
|
|
|
/*LTS-START*/
|
|
// If we have a target, scan for trailing ?, remove it, parse into targetParams
|
|
if (config->hasOption("target")){
|
|
std::string tgt = config->getString("target");
|
|
if (tgt.rfind('?') != std::string::npos){
|
|
INFO_MSG("Stripping target options: %s", tgt.substr(tgt.rfind('?') + 1).c_str());
|
|
HTTP::parseVars(tgt.substr(tgt.rfind('?') + 1), targetParams);
|
|
config->getOption("target", true).append(tgt.substr(0, tgt.rfind('?')));
|
|
}else{
|
|
INFO_MSG("Not modifying target (%s), no options present", tgt.c_str());
|
|
}
|
|
}
|
|
if (targetParams.count("rate")){
|
|
long long int multiplier = JSON::Value(targetParams["rate"]).asInt();
|
|
if (multiplier){
|
|
realTime = 1000 / multiplier;
|
|
}else{
|
|
realTime = 0;
|
|
}
|
|
}
|
|
if (isRecording() && DTSC::trackValidMask == TRACK_VALID_EXT_HUMAN){
|
|
DTSC::trackValidMask = TRACK_VALID_EXT_PUSH;
|
|
}
|
|
/*LTS-END*/
|
|
}
|
|
|
|
bool Output::isFileTarget(){
|
|
VERYHIGH_MSG("Default file target handler (false)");
|
|
return false;
|
|
}
|
|
|
|
void Output::listener(Util::Config &conf, int (*callback)(Socket::Connection &S)){
|
|
conf.serveForkedSocket(callback);
|
|
}
|
|
|
|
void Output::setBlocking(bool blocking){
|
|
isBlocking = blocking;
|
|
myConn.setBlocking(isBlocking);
|
|
}
|
|
|
|
bool Output::isRecording(){
|
|
return config->hasOption("target") && config->getString("target").size();
|
|
}
|
|
|
|
/// Called when stream initialization has failed.
|
|
/// The standard implementation will set isInitialized to false and close the client connection,
|
|
/// thus causing the process to exit cleanly.
|
|
void Output::onFail(const std::string &msg, bool critical){
|
|
if (critical){
|
|
FAIL_MSG("onFail '%s': %s", streamName.c_str(), msg.c_str());
|
|
}else{
|
|
MEDIUM_MSG("onFail '%s': %s", streamName.c_str(), msg.c_str());
|
|
}
|
|
Util::logExitReason(msg.c_str());
|
|
isInitialized = false;
|
|
wantRequest = true;
|
|
parseData = false;
|
|
myConn.close();
|
|
}
|
|
|
|
void Output::initialize(){
|
|
MEDIUM_MSG("initialize");
|
|
if (isInitialized){return;}
|
|
if (streamName.size() < 1){
|
|
return; // abort - no stream to initialize...
|
|
}
|
|
reconnect();
|
|
// if the connection failed, fail
|
|
if (!meta || streamName.size() < 1){
|
|
onFail("Could not connect to stream", true);
|
|
return;
|
|
}
|
|
sought = false;
|
|
/*LTS-START*/
|
|
if (Triggers::shouldTrigger("CONN_PLAY", streamName)){
|
|
std::string payload =
|
|
streamName + "\n" + getConnectedHost() + "\n" + capa["name"].asStringRef() + "\n" + reqUrl;
|
|
if (!Triggers::doTrigger("CONN_PLAY", payload, streamName)){
|
|
onFail("Not allowed to play (CONN_PLAY)");
|
|
}
|
|
}
|
|
doSync(true);
|
|
/*LTS-END*/
|
|
}
|
|
|
|
/// If called with force set to true and a USER_NEW trigger enabled, forces a sync immediately.
|
|
/// Otherwise, does nothing unless the sync byte is set to 2, in which case it forces a sync as well.
|
|
/// May be called recursively because it calls stats() which calls this function.
|
|
/// If this happens, the extra calls to the function return instantly.
|
|
void Output::doSync(bool force){
|
|
static bool recursing = false;
|
|
if (!statComm){return;}
|
|
if (recursing){return;}
|
|
recursing = true;
|
|
if (statComm.getSync() == 2 || force){
|
|
if (getStatsName() == capa["name"].asStringRef() && Triggers::shouldTrigger("USER_NEW", streamName)){
|
|
// sync byte 0 = no sync yet, wait for sync from controller...
|
|
char initialSync = 0;
|
|
// attempt to load sync status from session cache in shm
|
|
{
|
|
IPC::semaphore cacheLock(SEM_SESSCACHE, O_RDWR, ACCESSPERMS, 1);
|
|
if (cacheLock){cacheLock.wait();}
|
|
IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false);
|
|
if (shmSessions.mapped){
|
|
char shmEmpty[SHM_SESSIONS_ITEM];
|
|
memset(shmEmpty, 0, SHM_SESSIONS_ITEM);
|
|
std::string host;
|
|
Socket::hostBytesToStr(statComm.getHost().data(), 16, host);
|
|
uint32_t shmOffset = 0;
|
|
const std::string &cName = capa["name"].asStringRef();
|
|
while (shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){
|
|
// compare crc
|
|
if (Bit::btohl(shmSessions.mapped + shmOffset) == statComm.getCRC()){
|
|
// compare stream name
|
|
if (strncmp(shmSessions.mapped + shmOffset + 4, streamName.c_str(), 100) == 0){
|
|
// compare connector
|
|
if (strncmp(shmSessions.mapped + shmOffset + 104, cName.c_str(), 20) == 0){
|
|
// compare host
|
|
if (strncmp(shmSessions.mapped + shmOffset + 124, host.c_str(), 40) == 0){
|
|
initialSync = shmSessions.mapped[shmOffset + 164];
|
|
INFO_MSG("Instant-sync from session cache to %u", (unsigned int)initialSync);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// stop if we reached the end
|
|
if (memcmp(shmSessions.mapped + shmOffset, shmEmpty, SHM_SESSIONS_ITEM) == 0){
|
|
break;
|
|
}
|
|
shmOffset += SHM_SESSIONS_ITEM;
|
|
}
|
|
}
|
|
if (cacheLock){cacheLock.post();}
|
|
}
|
|
unsigned int i = 0;
|
|
statComm.setSync(initialSync);
|
|
// wait max 10 seconds for sync
|
|
while ((!statComm.getSync() || statComm.getSync() == 2) && i++ < 100){
|
|
Util::wait(100);
|
|
stats(true);
|
|
}
|
|
HIGH_MSG("USER_NEW sync achieved: %u", statComm.getSync());
|
|
// 1 = check requested (connection is new)
|
|
if (statComm.getSync() == 1){
|
|
std::string payload = streamName + "\n" + getConnectedHost() + "\n" +
|
|
JSON::Value(crc).asString() + "\n" + capa["name"].asStringRef() +
|
|
"\n" + reqUrl + "\n" + statComm.getSessId();
|
|
if (!Triggers::doTrigger("USER_NEW", payload, streamName)){
|
|
onFail("Not allowed to play (USER_NEW)");
|
|
statComm.setSync(100); // 100 = denied
|
|
}else{
|
|
statComm.setSync(10); // 10 = accepted
|
|
}
|
|
}
|
|
// 100 = denied
|
|
if (statComm.getSync() == 100){onFail("Not allowed to play (USER_NEW cache)");}
|
|
if (statComm.getSync() == 0){
|
|
onFail("Not allowed to play (USER_NEW init timeout)", true);
|
|
}
|
|
if (statComm.getSync() == 2){
|
|
onFail("Not allowed to play (USER_NEW re-init timeout)", true);
|
|
}
|
|
// anything else = accepted
|
|
}else{
|
|
statComm.setSync(10); // auto-accept if no trigger
|
|
}
|
|
}
|
|
recursing = false;
|
|
}
|
|
|
|
std::string Output::getConnectedHost(){return myConn.getHost();}
|
|
|
|
std::string Output::getConnectedBinHost(){
|
|
if (!prevHost.size()){
|
|
if (myConn && myConn.getPureSocket() != -1){
|
|
prevHost = myConn.getBinHost();
|
|
}
|
|
if (!prevHost.size()){prevHost.assign("\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001", 16);}
|
|
}
|
|
return prevHost;
|
|
}
|
|
|
|
bool Output::isReadyForPlay(){
|
|
// If a protocol does not support any codecs, we assume you know what you're doing
|
|
if (!capa.isMember("codecs")){return true;}
|
|
if (!isInitialized){initialize();}
|
|
meta.refresh();
|
|
if (getSupportedTracks().size()){
|
|
if (!userSelect.size()){selectDefaultTracks();}
|
|
size_t mainTrack = getMainSelectedTrack();
|
|
if (mainTrack != INVALID_TRACK_ID){
|
|
DTSC::Keys keys(M.keys(mainTrack));
|
|
if (keys.getValidCount() >= 2 || M.getLastms(mainTrack) - M.getFirstms(mainTrack) > 5000){
|
|
return true;
|
|
}
|
|
HIGH_MSG("NOT READY YET (%zu tracks, main track: %zu, with %zu keys)",
|
|
M.getValidTracks().size(), getMainSelectedTrack(), keys.getValidCount());
|
|
}else{
|
|
HIGH_MSG("NOT READY YET (%zu tracks)", getSupportedTracks().size());
|
|
}
|
|
}else{
|
|
HIGH_MSG("NOT READY (%zu tracks)", getSupportedTracks().size());
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/// Disconnects from all stat/user-related shared structures.
|
|
void Output::disconnect(){
|
|
MEDIUM_MSG("disconnect");
|
|
if (statComm){
|
|
statComm.unload();
|
|
myConn.resetCounter();
|
|
}
|
|
userSelect.clear();
|
|
isInitialized = false;
|
|
meta.clear();
|
|
}
|
|
|
|
/// Connects or reconnects to the stream.
|
|
/// Assumes streamName class member has been set already.
|
|
/// Will start input if not currently active, calls onFail() if this does not succeed.
|
|
/// After assuring stream is online, clears metaPages, then sets metaPages[0], statistics and
|
|
/// userClient to (hopefully) valid handles.
|
|
void Output::reconnect(){
|
|
thisPacket.null();
|
|
if (config->hasOption("noinput") && config->getBool("noinput")){
|
|
Util::sanitizeName(streamName);
|
|
if (!Util::streamAlive(streamName)){
|
|
onFail("Stream not active already, aborting");
|
|
return;
|
|
}
|
|
}else{
|
|
if (!Util::startInput(streamName, "", true, isPushing())){
|
|
// If stream is configured, use fallback stream setting, if set.
|
|
JSON::Value strCnf = Util::getStreamConfig(streamName);
|
|
if (strCnf && strCnf["fallback_stream"].asStringRef().size()){
|
|
streamName = strCnf["fallback_stream"].asStringRef();
|
|
Util::streamName = streamName;
|
|
INFO_MSG("Switching to configured fallback stream '%s'", streamName.c_str());
|
|
reconnect();
|
|
return;
|
|
}
|
|
|
|
// Not configured or no fallback stream? Use the default stream handler instead
|
|
// Note: Since fallback stream is handled recursively, the defaultStream handler
|
|
// may still be triggered for the fallback stream! This is intentional.
|
|
JSON::Value defStrmJson = Util::getGlobalConfig("defaultStream");
|
|
std::string defStrm = defStrmJson.asString();
|
|
if (Triggers::shouldTrigger("DEFAULT_STREAM", streamName)){
|
|
std::string payload = defStrm + "\n" + streamName + "\n" + getConnectedHost() + "\n" +
|
|
capa["name"].asStringRef() + "\n" + reqUrl;
|
|
// The return value is ignored, because the response (defStrm in this case) tells us what to do next, if anything.
|
|
Triggers::doTrigger("DEFAULT_STREAM", payload, streamName, false, defStrm);
|
|
}
|
|
if (!defStrm.size()){
|
|
onFail("Stream open failed", true);
|
|
return;
|
|
}
|
|
std::string newStrm = defStrm;
|
|
Util::streamVariables(newStrm, streamName, "");
|
|
if (streamName == newStrm){
|
|
onFail("Stream open failed; nothing to fall back to (" + defStrm + " == " + newStrm + ")", true);
|
|
return;
|
|
}
|
|
INFO_MSG("Stream open failed; falling back to default stream '%s' -> '%s'", defStrm.c_str(),
|
|
newStrm.c_str());
|
|
std::string origStream = streamName;
|
|
streamName = newStrm;
|
|
Util::streamName = streamName;
|
|
if (!Util::startInput(streamName, "", true, isPushing())){
|
|
onFail("Stream open failed (fallback stream for '" + origStream + "')", true);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
disconnect();
|
|
|
|
meta.reInit(streamName, false);
|
|
unsigned int attempts = 0;
|
|
while (!meta && ++attempts < 20 && Util::streamAlive(streamName)){
|
|
meta.reInit(streamName, false);
|
|
}
|
|
if (!meta){return;}
|
|
meta.refresh();
|
|
isInitialized = true;
|
|
statComm.reload();
|
|
stats(true);
|
|
if (isPushing()){return;}
|
|
if (!isRecording() && !M.getVod() && !isReadyForPlay()){
|
|
uint64_t waitUntil = Util::bootSecs() + 45;
|
|
while (!M.getVod() && !isReadyForPlay()){
|
|
if (Util::bootSecs() > waitUntil || (!userSelect.size() && Util::bootSecs() > waitUntil)){
|
|
INFO_MSG("Giving up waiting for playable tracks. IP: %s", getConnectedHost().c_str());
|
|
break;
|
|
}
|
|
Util::wait(500);
|
|
meta.refresh();
|
|
stats();
|
|
}
|
|
}
|
|
selectDefaultTracks();
|
|
}
|
|
|
|
std::set<size_t> Output::getSupportedTracks(const std::string &type) const{
|
|
return Util::getSupportedTracks(M, capa, type);
|
|
}
|
|
|
|
/// Automatically selects the tracks that are possible and/or wanted.
|
|
/// Returns true if the track selection changed in any way.
|
|
bool Output::selectDefaultTracks(){
|
|
if (!isInitialized){
|
|
initialize();
|
|
if (!isInitialized){return false;}
|
|
}
|
|
|
|
meta.refresh();
|
|
|
|
bool autoSeek = buffer.size();
|
|
uint64_t seekTarget = currentTime();
|
|
std::set<size_t> newSelects =
|
|
Util::wouldSelect(M, targetParams, capa, UA, autoSeek ? seekTarget : 0);
|
|
|
|
if (autoSeek){
|
|
std::set<size_t> toRemove;
|
|
for (std::set<size_t>::iterator it = newSelects.begin(); it != newSelects.end(); it++){
|
|
// autoSeeking and target not in bounds? Drop it too.
|
|
if (M.getLastms(*it) < std::max(seekTarget, (uint64_t)6000lu) - 6000){
|
|
toRemove.insert(*it);
|
|
}
|
|
}
|
|
// remove those from selectedtracks
|
|
for (std::set<size_t>::iterator it = toRemove.begin(); it != toRemove.end(); it++){
|
|
newSelects.erase(*it);
|
|
}
|
|
}
|
|
|
|
std::set<size_t> oldSelects;
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); ++it){
|
|
oldSelects.insert(it->first);
|
|
}
|
|
//No changes? Abort and return false;
|
|
if (oldSelects == newSelects){return false;}
|
|
|
|
//Temp set holding the differences between old and new track selections
|
|
std::set<size_t> diffs;
|
|
|
|
//Find elements in old selection but not in new selection
|
|
std::set_difference(oldSelects.begin(), oldSelects.end(), newSelects.begin(), newSelects.end(), std::inserter(diffs, diffs.end()));
|
|
if (diffs.size()){MEDIUM_MSG("Dropping %zu tracks", diffs.size());}
|
|
for (std::set<size_t>::iterator it = diffs.begin(); it != diffs.end(); it++){
|
|
HIGH_MSG("Dropping track %zu", *it);
|
|
userSelect.erase(*it);
|
|
}
|
|
|
|
//Find elements in new selection but not in old selection
|
|
std::set_difference(newSelects.begin(), newSelects.end(), oldSelects.begin(), oldSelects.end(), std::inserter(diffs, diffs.end()));
|
|
if (diffs.size()){MEDIUM_MSG("Adding %zu tracks", diffs.size());}
|
|
for (std::set<size_t>::iterator it = diffs.begin(); it != diffs.end(); it++){
|
|
HIGH_MSG("Adding track %zu", *it);
|
|
userSelect[*it].reload(streamName, *it);
|
|
if (!userSelect[*it]){
|
|
WARN_MSG("Could not select track %zu, dropping track", *it);
|
|
newSelects.erase(*it);
|
|
userSelect.erase(*it);
|
|
}
|
|
}
|
|
|
|
newSelects.clear();
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); ++it){
|
|
newSelects.insert(it->first);
|
|
}
|
|
|
|
//After attempting to add/remove tracks, now no changes? Abort and return false;
|
|
if (oldSelects == newSelects){return false;}
|
|
|
|
|
|
if (autoSeek){
|
|
INFO_MSG("Automatically seeking to position %" PRIu64 " to resume playback", seekTarget);
|
|
seek(seekTarget);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/// Clears the buffer, sets parseData to false, and generally makes not very much happen at all.
|
|
void Output::stop(){
|
|
buffer.clear();
|
|
parseData = false;
|
|
}
|
|
|
|
///Returns the timestamp of the next upcoming keyframe after thisPacket, or 0 if that cannot be determined (yet).
|
|
uint64_t Output::nextKeyTime(){
|
|
DTSC::Keys keys(M.keys(getMainSelectedTrack()));
|
|
if (!keys.getValidCount()){return 0;}
|
|
size_t keyNum = keys.getNumForTime(lastPacketTime);
|
|
return keys.getTime(keyNum+1);
|
|
}
|
|
|
|
uint64_t Output::pageNumForKey(size_t trackId, size_t keyNum){
|
|
const Util::RelAccX &tPages = M.pages(trackId);
|
|
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
|
uint64_t pageNum = tPages.getInt("firstkey", i);
|
|
if (pageNum > keyNum) break;
|
|
uint64_t pageKeys = tPages.getInt("keycount", i);
|
|
if (keyNum > pageNum + pageKeys - 1) continue;
|
|
uint64_t pageAvail = tPages.getInt("avail", i);
|
|
return pageAvail == 0 ? INVALID_PAGE_NUM : pageNum;
|
|
}
|
|
return INVALID_PAGE_NUM;
|
|
}
|
|
|
|
/// Gets the highest page number available for the given trackId.
|
|
uint64_t Output::pageNumMax(size_t trackId){
|
|
const Util::RelAccX &tPages = M.pages(trackId);
|
|
uint64_t highest = 0;
|
|
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
|
|
uint64_t pageNum = tPages.getInt("firstkey", i);
|
|
if (pageNum > highest){highest = pageNum;}
|
|
}
|
|
return highest;
|
|
}
|
|
|
|
/// Loads the page for the given trackId and keyNum into memory.
|
|
/// Overwrites any existing page for the same trackId.
|
|
/// Automatically calls thisPacket.null() if necessary.
|
|
void Output::loadPageForKey(size_t trackId, size_t keyNum){
|
|
if (!M.trackValid(trackId)){
|
|
WARN_MSG("Load for track %zu key %zu aborted - track does not exist", trackId, keyNum);
|
|
return;
|
|
}
|
|
if (!M.trackLoaded(trackId)){meta.refresh();}
|
|
DTSC::Keys keys(M.keys(trackId));
|
|
if (!keys.getValidCount()){
|
|
WARN_MSG("Load for track %zu key %zu aborted - track is empty", trackId, keyNum);
|
|
return;
|
|
}
|
|
size_t lastAvailKey = keys.getEndValid() - 1;
|
|
if (meta.getVod() && keyNum > lastAvailKey){
|
|
INFO_MSG("Load for track %zu key %zu aborted, is > %zu", trackId, keyNum, lastAvailKey);
|
|
curPage.erase(trackId);
|
|
currentPage.erase(trackId);
|
|
return;
|
|
}
|
|
uint64_t micros = Util::getMicros();
|
|
VERYHIGH_MSG("Loading track %zu, containing key %zu", trackId, keyNum);
|
|
uint32_t timeout = 0;
|
|
uint64_t pageNum = pageNumForKey(trackId, keyNum);
|
|
while (keepGoing() && pageNum == INVALID_PAGE_NUM){
|
|
if (!timeout){HIGH_MSG("Requesting page with key %zu:%zu", trackId, keyNum);}
|
|
++timeout;
|
|
//Time out after 15 seconds
|
|
if (timeout > 300){
|
|
FAIL_MSG("Timeout while waiting for requested page %zu for track %zu. Aborting.", pageNum, trackId);
|
|
curPage.erase(trackId);
|
|
currentPage.erase(trackId);
|
|
return;
|
|
}
|
|
if (!userSelect.count(trackId) || !userSelect[trackId]){
|
|
WARN_MSG("Loading page for non-selected track %zu", trackId);
|
|
}else{
|
|
userSelect[trackId].setKeyNum(keyNum);
|
|
}
|
|
|
|
stats(true);
|
|
playbackSleep(50);
|
|
pageNum = pageNumForKey(trackId, keyNum);
|
|
}
|
|
|
|
if (!keepGoing()){
|
|
INFO_MSG("Aborting page load due to shutdown");
|
|
return;
|
|
}
|
|
|
|
if (!userSelect.count(trackId) || !userSelect[trackId]){
|
|
WARN_MSG("Loading page for non-selected track %zu", trackId);
|
|
}else{
|
|
userSelect[trackId].setKeyNum(keyNum);
|
|
}
|
|
|
|
stats(true);
|
|
|
|
if (currentPage.count(trackId) && currentPage[trackId] == pageNum){return;}
|
|
// If we're loading the track thisPacket is on, null it to prevent accesses.
|
|
if (thisPacket && thisIdx == trackId){thisPacket.null();}
|
|
char id[NAME_BUFFER_SIZE];
|
|
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackId, pageNum);
|
|
curPage[trackId].init(id, DEFAULT_DATA_PAGE_SIZE);
|
|
if (!(curPage[trackId].mapped)){
|
|
FAIL_MSG("Initializing page %s failed", curPage[trackId].name.c_str());
|
|
currentPage.erase(trackId);
|
|
return;
|
|
}
|
|
currentPage[trackId] = pageNum;
|
|
micros = Util::getMicros(micros);
|
|
if (micros > 2000000){
|
|
INFO_MSG("Page %s loaded for %s in %.2fms", id, streamName.c_str(), micros/1000.0);
|
|
}else{
|
|
VERYHIGH_MSG("Page %s loaded for %s in %.2fms", id, streamName.c_str(), micros/1000.0);
|
|
}
|
|
}
|
|
|
|
/// Return the current time of the media buffer, or 0 if no buffer available.
|
|
uint64_t Output::currentTime(){
|
|
if (!buffer.size()){return 0;}
|
|
return buffer.begin()->time;
|
|
}
|
|
|
|
/// Return the start time of the selected tracks.
|
|
/// Returns the start time of earliest track if nothing is selected.
|
|
/// Returns zero if no tracks exist.
|
|
uint64_t Output::startTime(){
|
|
std::set<size_t> validTracks = M.getValidTracks();
|
|
if (!validTracks.size()){return 0;}
|
|
uint64_t start = 0xFFFFFFFFFFFFFFFFull;
|
|
if (userSelect.size()){
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
|
if (M.trackValid(it->first) && start > M.getFirstms(it->first)){
|
|
start = M.getFirstms(it->first);
|
|
}
|
|
}
|
|
}else{
|
|
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
|
|
if (start > M.getFirstms(*it)){start = M.getFirstms(*it);}
|
|
}
|
|
}
|
|
return start;
|
|
}
|
|
|
|
/// Return the end time of the selected tracks, or 0 if unknown or live.
|
|
/// Returns the end time of latest track if nothing is selected.
|
|
/// Returns zero if no tracks exist.
|
|
uint64_t Output::endTime(){
|
|
std::set<size_t> validTracks = M.getValidTracks();
|
|
if (!validTracks.size()){return 0;}
|
|
uint64_t end = 0;
|
|
if (userSelect.size()){
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
|
if (M.trackValid(it->first) && end < M.getLastms(it->first)){
|
|
end = meta.getLastms(it->first);
|
|
}
|
|
}
|
|
}else{
|
|
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
|
|
if (end < meta.getLastms(*it)){end = meta.getLastms(*it);}
|
|
}
|
|
}
|
|
return end;
|
|
}
|
|
|
|
/// Prepares all tracks from selectedTracks for seeking to the specified ms position.
|
|
void Output::seekKeyframesIn(unsigned long long pos, unsigned long long maxDelta){
|
|
sought = true;
|
|
if (!isInitialized){initialize();}
|
|
buffer.clear();
|
|
thisPacket.null();
|
|
MEDIUM_MSG("Seeking keyframes near %llums, max delta of %llu", pos, maxDelta);
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
|
if (!M.getValidTracks().count(it->first)){continue;}
|
|
uint64_t time = M.getTimeForKeyIndex(it->first, M.getKeyIndexForTime(it->first, pos));
|
|
uint64_t timeDelta = M.getTimeForKeyIndex(it->first, M.getKeyIndexForTime(it->first, pos + maxDelta));
|
|
if (time >= (pos - maxDelta)){
|
|
pos = time;
|
|
}else if (timeDelta >= (pos - maxDelta)){
|
|
pos = timeDelta;
|
|
}
|
|
seek(it->first, pos, false);
|
|
}
|
|
}
|
|
|
|
/// Prepares all tracks from selectedTracks for seeking to the specified ms position.
|
|
/// If toKey is true, clips the seek to the nearest keyframe if the main track is a video track.
|
|
void Output::seek(uint64_t pos, bool toKey){
|
|
sought = true;
|
|
if (!isInitialized){initialize();}
|
|
buffer.clear();
|
|
thisPacket.null();
|
|
if (toKey){
|
|
size_t mainTrack = getMainSelectedTrack();
|
|
if (mainTrack == INVALID_TRACK_ID){
|
|
WARN_MSG("Sync-seeking impossible (main track invalid); performing regular seek instead");
|
|
seek(pos);
|
|
return;
|
|
}
|
|
if (M.getType(mainTrack) == "video"){
|
|
DTSC::Keys keys(M.keys(mainTrack));
|
|
size_t keyNum = keys.getNumForTime(pos);
|
|
pos = keys.getTime(keyNum);
|
|
}
|
|
}
|
|
MEDIUM_MSG("Seeking to %" PRIu64 "ms (%s)", pos, toKey ? "sync" : "direct");
|
|
std::set<size_t> seekTracks;
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
|
seekTracks.insert(it->first);
|
|
}
|
|
for (std::set<size_t>::iterator it = seekTracks.begin(); it != seekTracks.end(); it++){
|
|
seek(*it, pos, false);
|
|
}
|
|
firstTime = Util::bootMS() - (currentTime() * realTime / 1000);
|
|
}
|
|
|
|
bool Output::seek(size_t tid, uint64_t pos, bool getNextKey){
|
|
if (!M.trackValid(tid)){
|
|
MEDIUM_MSG("Aborting seek to %" PRIu64 "ms in track %zu: Invalid track id.", pos, tid);
|
|
userSelect.erase(tid);
|
|
return false;
|
|
}
|
|
if (!M.trackLoaded(tid)){meta.refresh();}
|
|
if (!userSelect.count(tid) || !userSelect[tid]){
|
|
WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: user select failure (%s)", pos, tid, userSelect.count(tid)?"not connected":"not selected");
|
|
userSelect.erase(tid);
|
|
return false;
|
|
}
|
|
|
|
HIGH_MSG("Seeking for pos %" PRIu64, pos);
|
|
if (meta.getLive() && meta.getLastms(tid) < pos){
|
|
unsigned int maxTime = 0;
|
|
while (meta.getLastms(tid) < pos && myConn && ++maxTime <= 20){
|
|
Util::wait(500);
|
|
stats();
|
|
}
|
|
}
|
|
if (meta.getLastms(tid) < pos){
|
|
WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: past end of track (= %" PRIu64 "ms).",
|
|
pos, tid, meta.getLastms(tid));
|
|
userSelect.erase(tid);
|
|
return false;
|
|
}
|
|
DTSC::Keys keys(M.keys(tid));
|
|
size_t keyNum = keys.getNumForTime(pos);
|
|
uint64_t actualKeyTime = keys.getTime(keyNum);
|
|
HIGH_MSG("Seeking to track %zu key %zu => time %" PRIu64, tid, keyNum, pos);
|
|
if (actualKeyTime > pos){
|
|
if (M.getLive()){
|
|
WARN_MSG("Actually seeking to %" PRIu64 ", for %" PRIu64 " is not available any more",
|
|
actualKeyTime, pos);
|
|
pos = actualKeyTime;
|
|
userSelect[tid].setKeyNum(keyNum);
|
|
}
|
|
}
|
|
loadPageForKey(tid, keyNum + (getNextKey ? 1 : 0));
|
|
if (!curPage.count(tid) || !curPage[tid].mapped){
|
|
WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: not available.", pos, tid);
|
|
userSelect.erase(tid);
|
|
return false;
|
|
}
|
|
sortedPageInfo tmp;
|
|
tmp.tid = tid;
|
|
tmp.offset = 0;
|
|
tmp.partIndex = 0;
|
|
DTSC::Packet tmpPack;
|
|
tmpPack.reInit(curPage[tid].mapped + tmp.offset, 0, true);
|
|
tmp.time = tmpPack.getTime();
|
|
char *mpd = curPage[tid].mapped;
|
|
while (tmp.time < pos && tmpPack){
|
|
tmp.offset += tmpPack.getDataLen();
|
|
tmpPack.reInit(mpd + tmp.offset, 0, true);
|
|
tmp.time = tmpPack.getTime();
|
|
}
|
|
if (tmpPack){
|
|
HIGH_MSG("Sought to time %" PRIu64 " (yields a packet at %" PRIu64 "ms) in %s@%zu", tmp.time,
|
|
tmpPack.getTime(), streamName.c_str(), tid);
|
|
tmp.partIndex = M.getPartIndex(tmpPack.getTime(), tmp.tid);
|
|
buffer.insert(tmp);
|
|
return true;
|
|
}
|
|
// don't print anything for empty packets - not sign of corruption, just unfinished stream.
|
|
if (curPage[tid].mapped[tmp.offset] != 0){
|
|
FAIL_MSG("Noes! Couldn't find packet on track %zu because of some kind of corruption error "
|
|
"or somesuch.",
|
|
tid);
|
|
return false;
|
|
}
|
|
VERYHIGH_MSG("Track %zu no data (key %zu @ %" PRIu64 ") - waiting...", tid,
|
|
keyNum + (getNextKey ? 1 : 0), tmp.offset);
|
|
uint32_t i = 0;
|
|
while (!meta.getLive() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){
|
|
Util::wait(100 * i);
|
|
stats();
|
|
}
|
|
if (curPage[tid].mapped[tmp.offset]){return seek(tid, pos, getNextKey);}
|
|
FAIL_MSG("Track %zu no data (key %zu@%" PRIu64 ") - timeout", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset);
|
|
userSelect.erase(tid);
|
|
firstTime = Util::bootMS() - (buffer.begin()->time * realTime / 1000);
|
|
return false;
|
|
}
|
|
|
|
/// This function decides where in the stream initial playback starts.
|
|
/// The default implementation calls seek(0) for VoD.
|
|
/// For live, it seeks to the last sync'ed keyframe of the main track, no closer than
|
|
/// needsLookAhead+minKeepAway ms from the end. Unless lastms < 5000, then it seeks to the first
|
|
/// keyframe of the main track. Aborts if there is no main track or it has no keyframes.
|
|
void Output::initialSeek(){
|
|
if (!meta){return;}
|
|
uint64_t seekPos = 0;
|
|
if (meta.getLive()){
|
|
size_t mainTrack = getMainSelectedTrack();
|
|
if (mainTrack == INVALID_TRACK_ID){return;}
|
|
DTSC::Keys keys(M.keys(mainTrack));
|
|
if (!keys.getValidCount()){return;}
|
|
// seek to the newest keyframe, unless that is <5s, then seek to the oldest keyframe
|
|
uint32_t firstKey = keys.getFirstValid();
|
|
uint32_t lastKey = keys.getEndValid() - 1;
|
|
for (int64_t i = lastKey; i >= firstKey; i--){
|
|
seekPos = keys.getTime(i);
|
|
if (seekPos < 5000){continue;}// if we're near the start, skip back
|
|
bool good = true;
|
|
// check if all tracks have data for this point in time
|
|
for (std::map<size_t, Comms::Users>::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){
|
|
if (meta.getLastms(ti->first) < seekPos + needsLookAhead){
|
|
good = false;
|
|
break;
|
|
}
|
|
if (mainTrack == ti->first){continue;}// skip self
|
|
if (!M.trackValid(ti->first)){
|
|
HIGH_MSG("Skipping track %lu, not in tracks", ti->first);
|
|
continue;
|
|
}// ignore missing tracks
|
|
if (M.getLastms(ti->first) < seekPos + needsLookAhead + extraKeepAway + M.getMinKeepAway(ti->first)){
|
|
good = false;
|
|
break;
|
|
}
|
|
if (meta.getLastms(ti->first) == M.getFirstms(ti->first)){
|
|
HIGH_MSG("Skipping track %lu, last equals first", ti->first);
|
|
continue;
|
|
}// ignore point-tracks
|
|
if (meta.getLastms(ti->first) < seekPos){
|
|
good = false;
|
|
break;
|
|
}
|
|
HIGH_MSG("Track %zu is good", ti->first);
|
|
}
|
|
// if yes, seek here
|
|
if (good){break;}
|
|
}
|
|
}
|
|
/*LTS-START*/
|
|
if (isRecordingToFile){
|
|
if (M.getLive() &&
|
|
(targetParams.count("recstartunix") || targetParams.count("recstopunix"))){
|
|
uint64_t unixStreamBegin = Util::epoch() - endTime()/1000;
|
|
if (targetParams.count("recstartunix")){
|
|
uint64_t startUnix = atoll(targetParams["recstartunix"].c_str());
|
|
if (startUnix < unixStreamBegin){
|
|
WARN_MSG(
|
|
"Recording start time is earlier than stream begin - starting earliest possible");
|
|
targetParams["recstart"] = "-1";
|
|
}else{
|
|
targetParams["recstart"] = JSON::Value((startUnix - unixStreamBegin) * 1000).asString();
|
|
}
|
|
}
|
|
if (targetParams.count("recstopunix")){
|
|
uint64_t stopUnix = atoll(targetParams["recstopunix"].c_str());
|
|
if (stopUnix < unixStreamBegin){
|
|
onFail("Recording stop time is earlier than stream begin - aborting", true);
|
|
return;
|
|
}else{
|
|
targetParams["recstop"] = JSON::Value((stopUnix - unixStreamBegin) * 1000).asString();
|
|
}
|
|
}
|
|
}
|
|
if (targetParams.count("recstop")){
|
|
uint64_t endRec = atoll(targetParams["recstop"].c_str());
|
|
if (endRec < startTime()){
|
|
onFail("Entire recording range is in the past", true);
|
|
return;
|
|
}
|
|
INFO_MSG("Recording will stop at %" PRIu64, endRec);
|
|
}
|
|
if (targetParams.count("recstart") && atoll(targetParams["recstart"].c_str()) != 0){
|
|
size_t mainTrack = getMainSelectedTrack();
|
|
uint64_t startRec = atoll(targetParams["recstart"].c_str());
|
|
if (startRec > M.getLastms(mainTrack)){
|
|
if (M.getVod()){
|
|
onFail("Recording start past end of non-live source", true);
|
|
return;
|
|
}
|
|
uint64_t streamAvail = M.getLastms(mainTrack);
|
|
uint64_t lastUpdated = Util::getMS();
|
|
while (Util::getMS() - lastUpdated < 5000 && startRec > streamAvail && keepGoing()){
|
|
Util::sleep(500);
|
|
if (M.getLastms(mainTrack) > streamAvail){
|
|
stats();
|
|
streamAvail = M.getLastms(mainTrack);
|
|
lastUpdated = Util::getMS();
|
|
}
|
|
}
|
|
}
|
|
if (startRec < startTime()){
|
|
WARN_MSG("Record begin at %" PRIu64 " ms not available, starting at %" PRIu64
|
|
" ms instead",
|
|
startRec, startTime());
|
|
startRec = startTime();
|
|
}
|
|
INFO_MSG("Recording will start at %" PRIu64, startRec);
|
|
seekPos = startRec;
|
|
}
|
|
// Duration to record in seconds. Overrides recstop.
|
|
if (targetParams.count("split")){
|
|
long long endRec = atoll(targetParams["split"].c_str()) * 1000;
|
|
INFO_MSG("Will split recording every %lld seconds", atoll(targetParams["split"].c_str()));
|
|
targetParams["nxt-split"] = JSON::Value((int64_t)(seekPos + endRec)).asString();
|
|
}
|
|
if (targetParams.count("duration")){
|
|
long long endRec = atoll(targetParams["duration"].c_str()) * 1000;
|
|
targetParams["recstop"] = JSON::Value((int64_t)(seekPos + endRec)).asString();
|
|
}
|
|
if (targetParams.count("recstop")){
|
|
long long endRec = atoll(targetParams["recstop"].c_str());
|
|
if (endRec < 0 || endRec < startTime()){
|
|
onFail("Entire recording range is in the past", true);
|
|
return;
|
|
}
|
|
INFO_MSG("Recording will stop at %lld", endRec);
|
|
}
|
|
}else{
|
|
if (M.getLive() && targetParams.count("pushdelay")){
|
|
INFO_MSG("Converting pushdelay syntax into corresponding startunix+realtime options");
|
|
targetParams["startunix"] = std::string("-") + targetParams["pushdelay"];
|
|
targetParams["realtime"] = "1";
|
|
}
|
|
if (M.getLive() && (targetParams.count("startunix") || targetParams.count("stopunix"))){
|
|
uint64_t unixStreamBegin = Util::epoch() - endTime()/1000;
|
|
if (targetParams.count("startunix")){
|
|
int64_t startUnix = atoll(targetParams["startunix"].c_str());
|
|
if (startUnix < 0){
|
|
int64_t origStartUnix = startUnix;
|
|
startUnix += Util::epoch();
|
|
if (startUnix < unixStreamBegin){
|
|
INFO_MSG("Waiting for stream to reach intended starting point");
|
|
while (startUnix < Util::epoch() - (endTime() / 1000) && keepGoing()){
|
|
Util::wait(1000);
|
|
stats();
|
|
startUnix = origStartUnix + Util::epoch();
|
|
}
|
|
}
|
|
}
|
|
if (startUnix < unixStreamBegin){
|
|
WARN_MSG("Start time is earlier than stream begin - starting earliest possible");
|
|
WARN_MSG("%" PRId64 " < %" PRId64, startUnix, unixStreamBegin);
|
|
targetParams["start"] = "-1";
|
|
}else{
|
|
targetParams["start"] = JSON::Value((startUnix - unixStreamBegin) * 1000).asString();
|
|
}
|
|
}
|
|
if (targetParams.count("stopunix")){
|
|
int64_t stopUnix = atoll(targetParams["stopunix"].c_str());
|
|
if (stopUnix < 0){stopUnix += Util::epoch();}
|
|
if (stopUnix < unixStreamBegin){
|
|
onFail("Stop time is earlier than stream begin - aborting", true);
|
|
return;
|
|
}else{
|
|
targetParams["stop"] = JSON::Value((stopUnix - unixStreamBegin) * 1000).asString();
|
|
}
|
|
}
|
|
}
|
|
if (targetParams.count("stop")){
|
|
int64_t endRec = atoll(targetParams["stop"].c_str());
|
|
if (endRec < 0 || endRec < startTime()){
|
|
onFail("Entire range is in the past", true);
|
|
return;
|
|
}
|
|
INFO_MSG("Playback will stop at %" PRIu64, endRec);
|
|
}
|
|
if (targetParams.count("start") && atoll(targetParams["start"].c_str()) != 0){
|
|
size_t mainTrack = getMainSelectedTrack();
|
|
int64_t startRec = atoll(targetParams["start"].c_str());
|
|
if (startRec > M.getLastms(mainTrack)){
|
|
if (M.getVod()){
|
|
onFail("Playback start past end of non-live source", true);
|
|
return;
|
|
}
|
|
INFO_MSG("Waiting for stream to reach playback starting point");
|
|
int64_t streamAvail = M.getLastms(mainTrack);
|
|
int64_t lastUpdated = Util::getMS();
|
|
while (Util::getMS() - lastUpdated < 5000 && startRec > streamAvail && keepGoing()){
|
|
Util::sleep(500);
|
|
if (M.getLastms(mainTrack) > streamAvail){
|
|
stats();
|
|
streamAvail = M.getLastms(mainTrack);
|
|
lastUpdated = Util::getMS();
|
|
}
|
|
}
|
|
}
|
|
if (startRec < 0 || startRec < startTime()){
|
|
WARN_MSG("Playback begin at %" PRId64 " ms not available, starting at %" PRIu64
|
|
" ms instead",
|
|
startRec, startTime());
|
|
startRec = startTime();
|
|
}
|
|
INFO_MSG("Playback will start at %" PRIu64, startRec);
|
|
seekPos = startRec;
|
|
}
|
|
}
|
|
/*LTS-END*/
|
|
MEDIUM_MSG("Initial seek to %" PRIu64 "ms", seekPos);
|
|
seek(seekPos);
|
|
}
|
|
|
|
/// Returns the highest getMinKeepAway of all selected tracks
|
|
uint64_t Output::getMinKeepAway(){
|
|
uint64_t r = 0;
|
|
for (std::map<size_t, Comms::Users>::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){
|
|
if (ti->first == INVALID_TRACK_ID){continue;}
|
|
if (M.getMinKeepAway(ti->first) > r){r = M.getMinKeepAway(ti->first);}
|
|
}
|
|
//Limit the value to the maxKeepAway setting
|
|
//Also lowers extraKeepAway if needed
|
|
uint64_t maxKeepAway = M.getMaxKeepAway();
|
|
if (maxKeepAway){
|
|
if (r > maxKeepAway){r = maxKeepAway;}
|
|
if (r+extraKeepAway > maxKeepAway){extraKeepAway = maxKeepAway - r;}
|
|
}
|
|
return r;
|
|
}
|
|
|
|
/// This function attempts to forward playback in live streams to a more live point.
|
|
/// It seeks to the last sync'ed keyframe of the main track, no closer than needsLookAhead+minKeepAway ms from the end.
|
|
/// Aborts if not live, there is no main track or it has no keyframes.
|
|
bool Output::liveSeek(){
|
|
if (!realTime){return false;}//Makes no sense when playing in turbo mode
|
|
static uint32_t seekCount = 2;
|
|
uint64_t seekPos = 0;
|
|
if (!meta.getLive()){return false;}
|
|
size_t mainTrack = getMainSelectedTrack();
|
|
if (mainTrack == INVALID_TRACK_ID){return false;}
|
|
uint64_t lMs = meta.getLastms(mainTrack);
|
|
uint64_t cTime = thisPacket.getTime();
|
|
uint64_t mKa = getMinKeepAway();
|
|
if (!maxSkipAhead){
|
|
bool noReturn = false;
|
|
uint64_t newSpeed = 1000;
|
|
if (extraKeepAway > 0){extraKeepAway--;}//Reduce extra latency if possible
|
|
if (lMs - mKa - needsLookAhead - extraKeepAway > cTime + 50){
|
|
// We need to speed up!
|
|
uint64_t diff = (lMs - mKa - needsLookAhead - extraKeepAway) - cTime;
|
|
if (diff > 3000){
|
|
noReturn = true;
|
|
newSpeed = 1000;
|
|
}else if (diff > 1000){
|
|
newSpeed = 750;
|
|
}else if (diff > 500){
|
|
newSpeed = 900;
|
|
}else{
|
|
newSpeed = 950;
|
|
}
|
|
}
|
|
if (realTime != newSpeed){
|
|
HIGH_MSG("Changing playback speed from %" PRIu64 " to %" PRIu64 "(%" PRIu64 " ms LA, %" PRIu64 " ms mKA, %lu eKA)", realTime, newSpeed, needsLookAhead, mKa, extraKeepAway);
|
|
firstTime = Util::bootMS() - (cTime * newSpeed / 1000);
|
|
realTime = newSpeed;
|
|
}
|
|
if (!noReturn){return false;}
|
|
}
|
|
// cancel if there are no keys in the main track
|
|
if (mainTrack == INVALID_TRACK_ID){return false;}
|
|
DTSC::Keys mainKeys(meta.keys(mainTrack));
|
|
if (!mainKeys.getValidCount()){return false;}
|
|
|
|
for (size_t keyNum = mainKeys.getEndValid() - 1; keyNum >= mainKeys.getFirstValid(); keyNum--){
|
|
seekPos = mainKeys.getTime(keyNum);
|
|
// Only skip forward if we can win a decent amount (100ms)
|
|
if (seekPos <= cTime + 100 * seekCount){break;}
|
|
bool good = true;
|
|
// check if all tracks have data for this point in time
|
|
for (std::map<size_t, Comms::Users>::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){
|
|
if (ti->first == INVALID_TRACK_ID){
|
|
HIGH_MSG("Skipping track %zu, not in tracks", ti->first);
|
|
continue;
|
|
}// ignore missing tracks
|
|
if (meta.getLastms(ti->first) < seekPos + needsLookAhead + extraKeepAway + mKa){
|
|
good = false;
|
|
break;
|
|
}
|
|
if (mainTrack == ti->first){continue;}// skip self
|
|
if (meta.getLastms(ti->first) == meta.getFirstms(ti->first)){
|
|
HIGH_MSG("Skipping track %lu, last equals first", ti->first);
|
|
continue;
|
|
}// ignore point-tracks
|
|
HIGH_MSG("Track %lu is good", ti->first);
|
|
}
|
|
// if yes, seek here
|
|
if (good){
|
|
HIGH_MSG("Skipping forward %" PRIu64 "ms (%" PRIu64 " ms LA, %" PRIu64
|
|
" ms mKA, %lu eKA, > %" PRIu32 "ms)",
|
|
seekPos - cTime, needsLookAhead, mKa, extraKeepAway, seekCount * 100);
|
|
if (seekCount < 20){++seekCount;}
|
|
seek(seekPos);
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void Output::requestHandler(){
|
|
static bool firstData = true; // only the first time, we call onRequest if there's data buffered already.
|
|
if ((firstData && myConn.Received().size()) || myConn.spool()){
|
|
firstData = false;
|
|
DONTEVEN_MSG("onRequest");
|
|
onRequest();
|
|
lastRecv = Util::bootSecs();
|
|
}else{
|
|
if (!isBlocking && !parseData){
|
|
if (Util::bootSecs() - lastRecv > 300){
|
|
WARN_MSG("Disconnecting 5 minute idle connection");
|
|
onFail("Connection idle for 5 minutes");
|
|
}else{
|
|
Util::sleep(20);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Waits for the given amount of millis, increasing the realtime playback
|
|
/// related times as needed to keep smooth playback intact.
|
|
void Output::playbackSleep(uint64_t millis){
|
|
if (realTime && M.getLive()){
|
|
firstTime += millis;
|
|
extraKeepAway += millis;
|
|
}
|
|
Util::wait(millis);
|
|
}
|
|
|
|
/// Called right before sendNext(). Should return true if this is a stopping point.
|
|
bool Output::reachedPlannedStop(){
|
|
// If we're recording to file and reached the target position, stop
|
|
if (isRecordingToFile && targetParams.count("recstop") &&
|
|
atoll(targetParams["recstop"].c_str()) <= lastPacketTime){
|
|
INFO_MSG("End of planned recording reached");
|
|
return true;
|
|
}
|
|
// Regardless of playback method, if we've reached the wanted stop point, stop
|
|
if (targetParams.count("stop") && atoll(targetParams["stop"].c_str()) <= lastPacketTime){
|
|
INFO_MSG("End of planned playback reached");
|
|
return true;
|
|
}
|
|
// check if we need to split here
|
|
if (inlineRestartCapable() && targetParams.count("split")){
|
|
// Make sure that inlineRestartCapable outputs with splitting enabled only stop right before
|
|
// keyframes This works because this function is executed right BEFORE sendNext(), causing
|
|
// thisPacket to be the next packet in the newly splitted file.
|
|
if (!thisPacket.getFlag("keyframe")){return false;}
|
|
// is this a split point?
|
|
if (targetParams.count("nxt-split") && atoll(targetParams["nxt-split"].c_str()) <= lastPacketTime){
|
|
INFO_MSG("Split point reached");
|
|
return true;
|
|
}
|
|
}
|
|
// Otherwise, we're not stopping
|
|
return false;
|
|
}
|
|
|
|
/// \triggers
|
|
/// The `"CONN_OPEN"` trigger is stream-specific, and is ran when a connection is made or passed to a new handler. Its payload is:
|
|
/// ~~~~~~~~~~~~~~~
|
|
/// streamname
|
|
/// connected client host
|
|
/// output handler name
|
|
/// request URL (if any)
|
|
/// ~~~~~~~~~~~~~~~
|
|
/// The `"CONN_CLOSE"` trigger is stream-specific, and is ran when a connection closes. Its payload is:
|
|
/// ~~~~~~~~~~~~~~~
|
|
/// streamname
|
|
/// connected client host
|
|
/// output handler name
|
|
/// request URL (if any)
|
|
/// ~~~~~~~~~~~~~~~
|
|
int Output::run(){
|
|
/*LTS-START*/
|
|
// Connect to file target, if needed
|
|
if (isFileTarget()){
|
|
if (!streamName.size()){
|
|
WARN_MSG("Recording unconnected %s output to file! Cancelled.", capa["name"].asString().c_str());
|
|
onFail("Unconnected recording output", true);
|
|
return 2;
|
|
}
|
|
initialize();
|
|
if (!M.getValidTracks().size() || !userSelect.size() || !keepGoing()){
|
|
INFO_MSG("Stream not available - aborting");
|
|
onFail("Stream not available for recording", true);
|
|
return 3;
|
|
}
|
|
if (config->getString("target") == "-"){
|
|
parseData = true;
|
|
wantRequest = false;
|
|
if (!targetParams.count("realtime")){realTime = 0;}
|
|
INFO_MSG("Outputting %s to stdout with %s format", streamName.c_str(),
|
|
capa["name"].asString().c_str());
|
|
}else{
|
|
if (!connectToFile(config->getString("target"), targetParams.count("append"))){
|
|
onFail("Could not connect to the target for recording", true);
|
|
return 3;
|
|
}
|
|
parseData = true;
|
|
wantRequest = false;
|
|
if (!targetParams.count("realtime")){realTime = 0;}
|
|
INFO_MSG("Recording %s to %s with %s format", streamName.c_str(),
|
|
config->getString("target").c_str(), capa["name"].asString().c_str());
|
|
}
|
|
}
|
|
// Handle CONN_OPEN trigger, if needed
|
|
if (Triggers::shouldTrigger("CONN_OPEN", streamName)){
|
|
std::string payload =
|
|
streamName + "\n" + getConnectedHost() + "\n" + capa["name"].asStringRef() + "\n" + reqUrl;
|
|
if (!Triggers::doTrigger("CONN_OPEN", payload, streamName)){return 1;}
|
|
}
|
|
/*LTS-END*/
|
|
DONTEVEN_MSG("MistOut client handler started");
|
|
while (keepGoing() && (wantRequest || parseData)){
|
|
if (wantRequest){requestHandler();}
|
|
if (parseData){
|
|
if (!isInitialized){
|
|
initialize();
|
|
if (!isInitialized){
|
|
onFail("Stream initialization failed");
|
|
break;
|
|
}
|
|
}
|
|
if (!sentHeader && keepGoing()){
|
|
DONTEVEN_MSG("sendHeader");
|
|
sendHeader();
|
|
}
|
|
if (!sought){initialSeek();}
|
|
if (prepareNext()){
|
|
if (thisPacket){
|
|
lastPacketTime = thisPacket.getTime();
|
|
if (firstPacketTime == 0xFFFFFFFFFFFFFFFFull){firstPacketTime = lastPacketTime;}
|
|
|
|
// slow down processing, if real time speed is wanted
|
|
if (realTime){
|
|
uint8_t i = 6;
|
|
while (--i && thisPacket.getTime() > (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead) &&
|
|
keepGoing()){
|
|
uint64_t amount = thisPacket.getTime() - (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead);
|
|
if (amount > 1000){amount = 1000;}
|
|
Util::sleep(amount);
|
|
//Make sure we stay responsive to requests and stats while waiting
|
|
if (wantRequest){requestHandler();}
|
|
stats();
|
|
}
|
|
}
|
|
|
|
// delay the stream until metadata has caught up, if needed
|
|
if (needsLookAhead){
|
|
// we sleep in 20ms increments, or less if the lookahead time itself is less
|
|
uint32_t sleepTime = std::min(20ul, needsLookAhead);
|
|
// wait at most double the look ahead time, plus ten seconds
|
|
uint64_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000 / sleepTime);
|
|
uint64_t needsTime = thisPacket.getTime() + needsLookAhead;
|
|
bool firstTime = true;
|
|
while (--timeoutTries && keepGoing()){
|
|
bool lookReady = true;
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin();
|
|
it != userSelect.end(); it++){
|
|
if (meta.getLastms(it->first) <= needsTime){
|
|
if (timeoutTries == 1){
|
|
WARN_MSG("Track %zu: %" PRIu64 " <= %" PRIu64, it->first,
|
|
meta.getLastms(it->first), needsTime);
|
|
}
|
|
lookReady = false;
|
|
break;
|
|
}
|
|
}
|
|
if (lookReady){break;}
|
|
if (firstTime){
|
|
firstTime = false;
|
|
}else{
|
|
playbackSleep(sleepTime);
|
|
}
|
|
//Make sure we stay responsive to requests and stats while waiting
|
|
if (wantRequest){requestHandler();}
|
|
stats();
|
|
}
|
|
if (!timeoutTries){
|
|
WARN_MSG("Waiting for lookahead timed out - resetting lookahead!");
|
|
needsLookAhead = 0;
|
|
}
|
|
}
|
|
|
|
if (reachedPlannedStop()){
|
|
const char *origTarget = getenv("MST_ORIG_TARGET");
|
|
targetParams.erase("nxt-split");
|
|
if (inlineRestartCapable() && origTarget && !reachedPlannedStop()){
|
|
std::string newTarget = origTarget;
|
|
Util::streamVariables(newTarget, streamName);
|
|
if (newTarget.rfind('?') != std::string::npos){
|
|
newTarget.erase(newTarget.rfind('?'));
|
|
}
|
|
INFO_MSG("Switching to next push target filename: %s", newTarget.c_str());
|
|
if (!connectToFile(newTarget)){
|
|
FAIL_MSG("Failed to open file, aborting: %s", newTarget.c_str());
|
|
Util::logExitReason("failed to open file, aborting: %s", newTarget.c_str());
|
|
onFinish();
|
|
break;
|
|
}
|
|
uint64_t endRec = lastPacketTime + atoll(targetParams["split"].c_str()) * 1000;
|
|
targetParams["nxt-split"] = JSON::Value(endRec).asString();
|
|
sentHeader = false;
|
|
sendHeader();
|
|
}else{
|
|
if (!onFinish()){
|
|
INFO_MSG("Shutting down because planned stopping point reached");
|
|
Util::logExitReason("planned stopping point reached");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
sendNext();
|
|
}else{
|
|
Util::logExitReason("end of stream");
|
|
/*LTS-START*/
|
|
if (Triggers::shouldTrigger("CONN_STOP", streamName)){
|
|
std::string payload =
|
|
streamName + "\n" + getConnectedHost() + "\n" + capa["name"].asStringRef() + "\n";
|
|
Triggers::doTrigger("CONN_STOP", payload, streamName);
|
|
}
|
|
/*LTS-END*/
|
|
if (!onFinish()){break;}
|
|
}
|
|
}
|
|
if (!meta){
|
|
Util::logExitReason("lost internal connection to stream data");
|
|
break;
|
|
}
|
|
}
|
|
stats();
|
|
}
|
|
if (!config->is_active){Util::logExitReason("set inactive");}
|
|
if (!myConn){Util::logExitReason("connection closed");}
|
|
if (strncmp(Util::exitReason, "connection closed", 17) == 0){
|
|
MEDIUM_MSG("Client handler shutting down, exit reason: %s", Util::exitReason);
|
|
}else{
|
|
INFO_MSG("Client handler shutting down, exit reason: %s", Util::exitReason);
|
|
}
|
|
onFinish();
|
|
|
|
/*LTS-START*/
|
|
if (Triggers::shouldTrigger("CONN_CLOSE", streamName)){
|
|
std::string payload =
|
|
streamName + "\n" + getConnectedHost() + "\n" + capa["name"].asStringRef() + "\n" + reqUrl;
|
|
Triggers::doTrigger("CONN_CLOSE", payload, streamName);
|
|
}
|
|
if (isRecordingToFile && config->hasOption("target") && Triggers::shouldTrigger("RECORDING_END", streamName)){
|
|
uint64_t rightNow = Util::epoch();
|
|
std::stringstream payl;
|
|
payl << streamName << '\n';
|
|
payl << config->getString("target") << '\n';
|
|
payl << capa["name"].asStringRef() << '\n';
|
|
payl << myConn.dataUp() << '\n';
|
|
payl << (Util::bootSecs() - myConn.connTime()) << '\n';
|
|
payl << (rightNow - (Util::bootSecs() - myConn.connTime())) << '\n';
|
|
payl << rightNow << '\n';
|
|
if (firstPacketTime != 0xFFFFFFFFFFFFFFFFull){
|
|
payl << (lastPacketTime - firstPacketTime) << '\n';
|
|
}else{
|
|
payl << 0 << '\n';
|
|
}
|
|
payl << firstPacketTime << '\n';
|
|
payl << lastPacketTime << '\n';
|
|
Triggers::doTrigger("RECORDING_END", payl.str(), streamName);
|
|
}
|
|
/*LTS-END*/
|
|
|
|
stats(true);
|
|
if (statComm){statComm.setStatus(COMM_STATUS_DISCONNECT);}
|
|
|
|
userSelect.clear();
|
|
|
|
myConn.close();
|
|
return 0;
|
|
}
|
|
|
|
void Output::dropTrack(size_t trackId, const std::string &reason, bool probablyBad){
|
|
if (!userSelect.count(trackId)){return;}
|
|
// depending on whether this is probably bad and the current debug level, print a message
|
|
size_t printLevel = (probablyBad ? DLVL_WARN : DLVL_INFO);
|
|
const Comms::Users &usr = userSelect.at(trackId);
|
|
DEBUG_MSG(printLevel, "Dropping %s (%s) track %zu@k%zu (nextP=%zu, lastP=%zu): %s",
|
|
streamName.c_str(), meta.getCodec(trackId).c_str(), trackId, usr.getKeyNum() + 1,
|
|
pageNumForKey(trackId, usr.getKeyNum() + 1), pageNumMax(trackId), reason.c_str());
|
|
// now actually drop the track from the buffer
|
|
for (std::set<sortedPageInfo>::iterator it = buffer.begin(); it != buffer.end(); ++it){
|
|
if (it->tid == trackId){
|
|
buffer.erase(it);
|
|
break;
|
|
}
|
|
}
|
|
userSelect.erase(trackId);
|
|
}
|
|
|
|
/// Assumes at least one video track is selected.
|
|
/// Seeks back in the buffer to the newest keyframe with a timestamp less than the current
|
|
/// timestamp. Sets thisPacket to that frame, and then undoes the seek. The next call to
|
|
/// prepareNext continues as if this function was never called.
|
|
bool Output::getKeyFrame(){
|
|
// store copy of current state
|
|
std::set<sortedPageInfo> tmp_buffer = buffer;
|
|
std::map<size_t, Comms::Users> tmp_userSelect = userSelect;
|
|
std::map<size_t, size_t> tmp_currentPage = currentPage;
|
|
|
|
// reset the current packet to null, assuming failure
|
|
thisPacket.null();
|
|
|
|
// find the main track, check if it is video. Abort if not.
|
|
size_t mainTrack = getMainSelectedTrack();
|
|
if (M.getType(mainTrack) != "video"){return false;}
|
|
|
|
// we now know that mainTrack is a video track - let's do some work!
|
|
// first, we remove all selected tracks and the buffer. Then we select only the main track.
|
|
uint64_t currTime = currentTime();
|
|
buffer.clear();
|
|
userSelect.clear();
|
|
userSelect[mainTrack].reload(streamName, mainTrack);
|
|
// now, seek to the exact timestamp of the keyframe
|
|
DTSC::Keys keys(M.keys(mainTrack));
|
|
unsigned int targetKey = keys.getNumForTime(currTime);
|
|
seek(keys.getTime(targetKey));
|
|
// attempt to load the key into thisPacket
|
|
bool ret = prepareNext();
|
|
if (!ret){
|
|
WARN_MSG("Failed to load keyframe for %" PRIu64 "ms - continuing without it", currTime);
|
|
}
|
|
|
|
// restore state to before the seek/load
|
|
// most of these can simply be copied back...
|
|
buffer = tmp_buffer;
|
|
userSelect = tmp_userSelect;
|
|
// but the currentPage map must also load keys as needed
|
|
for (std::map<size_t, size_t>::iterator it = tmp_currentPage.begin(); it != tmp_currentPage.end(); ++it){
|
|
loadPageForKey(it->first, it->second);
|
|
}
|
|
// now we are back to normal and can return safely
|
|
return ret;
|
|
}
|
|
|
|
/// Attempts to prepare a new packet for output.
|
|
/// If it returns true and thisPacket evaluates to false, playback has completed.
|
|
/// Could be called repeatedly in a loop if you really really want a new packet.
|
|
/// \returns true if thisPacket was filled with the next packet.
|
|
/// \returns false if we could not reliably determine the next packet yet.
|
|
bool Output::prepareNext(){
|
|
static size_t emptyCount = 0;
|
|
if (!buffer.size()){
|
|
thisPacket.null();
|
|
INFO_MSG("Buffer completely played out");
|
|
return true;
|
|
}
|
|
// check if we have a next seek point for every track that is selected
|
|
if (buffer.size() != userSelect.size()){
|
|
INFO_MSG("Buffer/select mismatch: %zu/%zu - correcting", buffer.size(), userSelect.size());
|
|
std::set<size_t> dropTracks;
|
|
if (buffer.size() < userSelect.size()){
|
|
// prepare to drop any selectedTrack without buffer entry
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); ++it){
|
|
bool found = false;
|
|
for (std::set<sortedPageInfo>::iterator bi = buffer.begin(); bi != buffer.end(); ++bi){
|
|
if (bi->tid == it->first){
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!found){dropTracks.insert(it->first);}
|
|
}
|
|
}else{
|
|
std::set<size_t> seen;
|
|
// prepare to drop any buffer entry without selectedTrack
|
|
for (std::set<sortedPageInfo>::iterator bi = buffer.begin(); bi != buffer.end(); ++bi){
|
|
if (!userSelect.count(bi->tid)){dropTracks.insert(bi->tid);}
|
|
if (seen.count(bi->tid)){
|
|
INFO_MSG("Dropping duplicate buffer entry for track %zu", bi->tid);
|
|
buffer.erase(bi);
|
|
return false;
|
|
}
|
|
seen.insert(bi->tid);
|
|
}
|
|
}
|
|
if (!dropTracks.size()){
|
|
FAIL_MSG("Could not equalize tracks! This is very very very bad and I am now going to shut down to prevent worse.");
|
|
Util::logExitReason("Could not equalize tracks");
|
|
parseData = false;
|
|
config->is_active = false;
|
|
return false;
|
|
}
|
|
// actually drop what we found.
|
|
// if both of the above cases occur, the next prepareNext iteration will take care of that
|
|
for (std::set<size_t>::iterator it = dropTracks.begin(); it != dropTracks.end(); ++it){
|
|
dropTrack(*it, "seek/select mismatch");
|
|
}
|
|
return false;
|
|
}
|
|
|
|
sortedPageInfo nxt = *(buffer.begin());
|
|
|
|
if (!M.getValidTracks().count(nxt.tid)){
|
|
dropTrack(nxt.tid, "disappeared from metadata");
|
|
return false;
|
|
}
|
|
|
|
if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){
|
|
loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time));
|
|
nxt.offset = 0;
|
|
//Only read the next time if the page load succeeded and there is a packet to read from
|
|
if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){
|
|
nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0);
|
|
}
|
|
buffer.erase(buffer.begin());
|
|
buffer.insert(nxt);
|
|
return false;
|
|
}
|
|
|
|
// if we're going to read past the end of the data page, load the next page
|
|
// this only happens for VoD
|
|
if (nxt.offset >= curPage[nxt.tid].len ||
|
|
(!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){
|
|
if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){
|
|
dropTrack(nxt.tid, "end of VoD track reached", false);
|
|
return false;
|
|
}
|
|
dropTrack(nxt.tid, "VoD page load failure");
|
|
return false;
|
|
}
|
|
|
|
// We know this packet will be valid, pre-load it so we know its length
|
|
DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true);
|
|
|
|
uint64_t nextTime = 0;
|
|
|
|
DTSC::Keys keys(M.keys(nxt.tid));
|
|
size_t thisKey = keys.getNumForTime(nxt.time);
|
|
|
|
// Check if we have a next valid packet
|
|
if (curPage[nxt.tid].len > nxt.offset+preLoad.getDataLen()+20 && memcmp(curPage[nxt.tid].mapped + nxt.offset + preLoad.getDataLen(), "\000\000\000\000", 4)){
|
|
nextTime = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset + preLoad.getDataLen());
|
|
if (!nextTime){
|
|
WARN_MSG("Next packet is available (offset %" PRIu64 " / %" PRIu64 " on %s), but has no time. Please warn the developers if you see this message!", nxt.offset, curPage[nxt.tid].len, curPage[nxt.tid].name.c_str());
|
|
dropTrack(nxt.tid, "EOP: invalid next packet");
|
|
return false;
|
|
}
|
|
}else{
|
|
//no next packet yet!
|
|
//Check if this is the last packet of a VoD stream. Return success and drop the track.
|
|
if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){
|
|
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
|
|
thisIdx = nxt.tid;
|
|
dropTrack(nxt.tid, "end of VoD track reached", false);
|
|
return true;
|
|
}
|
|
//Check if there exists a different page for the next key
|
|
size_t nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);
|
|
if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){
|
|
// If so, the next key is our next packet
|
|
nextTime = keys.getTime(thisKey + 1);
|
|
}else{
|
|
//Okay, there's no next page yet, and no next packet on this page either.
|
|
//That means we're waiting for data to show up, somewhere.
|
|
// after ~25 seconds, give up and drop the track.
|
|
if (++emptyCount >= 2500){
|
|
dropTrack(nxt.tid, "EOP: data wait timeout");
|
|
return false;
|
|
}
|
|
//every ~1 second, check if the stream is not offline
|
|
if (emptyCount % 100 == 0 && M.getLive() && Util::getStreamStatus(streamName) == STRMSTAT_OFF){
|
|
Util::logExitReason("Stream source shut down");
|
|
thisPacket.null();
|
|
return true;
|
|
}
|
|
//every ~16 seconds, reconnect to metadata
|
|
if (emptyCount % 1600 == 0){
|
|
reconnect();
|
|
if (!meta){
|
|
onFail("Could not connect to stream data", true);
|
|
thisPacket.null();
|
|
return true;
|
|
}
|
|
// if we don't have a connection to the metadata here, this means the stream has gone offline in the meanwhile.
|
|
if (!meta){
|
|
Util::logExitReason("Attempted reconnect to source failed");
|
|
thisPacket.null();
|
|
return true;
|
|
}
|
|
return false;//no sleep after reconnect
|
|
}
|
|
//Fine! We didn't want a packet, anyway. Let's try again later.
|
|
playbackSleep(10);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
//If the next packet should've been before the current packet, something is wrong. Abort, abort!
|
|
if (nextTime < nxt.time){
|
|
dropTrack(nxt.tid, "time going backwards");
|
|
return false;
|
|
}
|
|
|
|
// we've handled all special cases - at this point the packet should exist
|
|
// let's load it
|
|
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
|
|
thisIdx = nxt.tid;
|
|
// if it failed, drop the track and continue
|
|
if (!thisPacket){
|
|
dropTrack(nxt.tid, "packet load failure");
|
|
return false;
|
|
}
|
|
emptyCount = 0; // valid packet - reset empty counter
|
|
|
|
if (!userSelect[nxt.tid].isAlive()){
|
|
INFO_MSG("Track %zu is not alive!", nxt.tid);
|
|
return false;
|
|
}
|
|
|
|
// we assume the next packet is the next on this same page
|
|
nxt.offset += thisPacket.getDataLen();
|
|
nxt.time = nextTime;
|
|
++nxt.partIndex;
|
|
|
|
userSelect[nxt.tid].setKeyNum(thisKey);
|
|
|
|
// exchange the current packet in the buffer for the next one
|
|
buffer.erase(buffer.begin());
|
|
buffer.insert(nxt);
|
|
|
|
return true;
|
|
}
|
|
|
|
/// Returns the name as it should be used in statistics.
|
|
/// Outputs used as an input should return INPUT, outputs used for automation should return OUTPUT, others should return their proper name.
|
|
/// The default implementation is usually good enough for all the non-INPUT types.
|
|
std::string Output::getStatsName(){
|
|
if (isPushing()){return "INPUT:" + capa["name"].asStringRef();}
|
|
if (config->hasOption("target") && config->getString("target").size()){
|
|
return "OUTPUT:" + capa["name"].asStringRef();
|
|
}
|
|
return capa["name"].asStringRef();
|
|
}
|
|
|
|
void Output::stats(bool force){
|
|
// cancel stats update if not initialized
|
|
if (!isInitialized){return;}
|
|
// also cancel if it has been less than a second since the last update
|
|
// unless force is set to true
|
|
uint64_t now = Util::bootSecs();
|
|
if (now == lastStats && !force){return;}
|
|
|
|
if (isRecording()){
|
|
static uint64_t lastPushUpdate = now;
|
|
if (lastPushUpdate + 5 <= now){
|
|
JSON::Value pStat;
|
|
pStat["push_status_update"]["id"] = getpid();
|
|
JSON::Value & pData = pStat["push_status_update"]["status"];
|
|
pData["mediatime"] = currentTime();
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
|
pData["tracks"].append(it->first);
|
|
}
|
|
pData["bytes"] = myConn.dataUp();
|
|
pData["active_seconds"] = (now - myConn.connTime());
|
|
Socket::UDPConnection uSock;
|
|
uSock.SetDestination("localhost", 4242);
|
|
uSock.SendNow(pStat.toString());
|
|
lastPushUpdate = now;
|
|
}
|
|
}
|
|
|
|
if (!statComm){statComm.reload();}
|
|
if (!statComm){return;}
|
|
|
|
lastStats = now;
|
|
|
|
HIGH_MSG("Writing stats: %s, %s, %u, %lu, %lu", getConnectedHost().c_str(), streamName.c_str(),
|
|
crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown());
|
|
/*LTS-START*/
|
|
if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){
|
|
onFail("Shutting down on controller request");
|
|
return;
|
|
}
|
|
/*LTS-END*/
|
|
statComm.setNow(now);
|
|
statComm.setHost(getConnectedBinHost());
|
|
statComm.setCRC(crc);
|
|
statComm.setStream(streamName);
|
|
statComm.setConnector(getStatsName());
|
|
statComm.setUp(myConn.dataUp());
|
|
statComm.setDown(myConn.dataDown());
|
|
statComm.setTime(now - myConn.connTime());
|
|
statComm.setLastSecond(thisPacket ? thisPacket.getTime() : 0);
|
|
statComm.setPid(getpid());
|
|
statComm.keepAlive();
|
|
|
|
/*LTS-START*/
|
|
// Tag the session with the user agent
|
|
static bool newUA = true; // we only do this once per connection
|
|
if (newUA && ((now - myConn.connTime()) >= uaDelay || !myConn) && UA.size()){
|
|
std::string APIcall =
|
|
"{\"tag_sessid\":{\"" + statComm.getSessId() + "\":" + JSON::string_escape("UA:" + UA) + "}}";
|
|
Socket::UDPConnection uSock;
|
|
uSock.SetDestination("localhost", 4242);
|
|
uSock.SendNow(APIcall);
|
|
newUA = false;
|
|
}
|
|
/*LTS-END*/
|
|
|
|
doSync();
|
|
|
|
if (isPushing()){
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
|
it->second.keepAlive();
|
|
if (it->second.getStatus() == COMM_STATUS_REQDISCONNECT){
|
|
if (dropPushTrack(it->second.getTrack(), "disconnect request from buffer")){break;}
|
|
}
|
|
if (!it->second.isAlive()){
|
|
if (dropPushTrack(it->second.getTrack(), "track mapping no longer valid")){break;}
|
|
}
|
|
//if (Util::bootSecs() - M.getLastUpdated(it->first) > 5){
|
|
// if (dropPushTrack(it->second.getTrack(), "track updates being ignored by buffer")){break;}
|
|
//}
|
|
}
|
|
}else{
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
|
it->second.keepAlive();
|
|
}
|
|
}
|
|
}
|
|
|
|
bool Output::dropPushTrack(uint32_t trackId, const std::string & dropReason){
|
|
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
|
|
if (it->second.getTrack() == trackId){
|
|
WARN_MSG("Dropping input track %" PRIu32 ": %s", trackId, dropReason.c_str());
|
|
userSelect.erase(it);
|
|
return true;
|
|
break;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void Output::onRequest(){
|
|
// simply clear the buffer, we don't support any kind of input by default
|
|
myConn.Received().clear();
|
|
wantRequest = false;
|
|
}
|
|
|
|
void Output::sendHeader(){
|
|
// just set the sentHeader bool to true, by default
|
|
sentHeader = true;
|
|
}
|
|
|
|
bool Output::connectToFile(std::string file, bool append){
|
|
int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
|
|
int mode = O_RDWR | O_CREAT | (append ? O_APPEND : O_TRUNC);
|
|
if (!Util::createPathFor(file)){
|
|
ERROR_MSG("Cannot not create file %s: could not create parent folder", file.c_str());
|
|
return false;
|
|
}
|
|
int outFile = open(file.c_str(), mode, flags);
|
|
if (outFile < 0){
|
|
ERROR_MSG("Failed to open file %s, error: %s", file.c_str(), strerror(errno));
|
|
return false;
|
|
}
|
|
|
|
int r = dup2(outFile, myConn.getSocket());
|
|
if (r == -1){
|
|
ERROR_MSG("Failed to create an alias for the socket using dup2: %s.", strerror(errno));
|
|
return false;
|
|
}
|
|
close(outFile);
|
|
isRecordingToFile = true;
|
|
return true;
|
|
}
|
|
|
|
/// Checks if the set streamName allows pushes from this connector/IP/password combination.
|
|
/// Runs all appropriate triggers and checks.
|
|
/// Returns true if the push should continue, false otherwise.
|
|
bool Output::allowPush(const std::string &passwd){
|
|
pushing = true;
|
|
std::string strmSource;
|
|
|
|
// Initialize the stream source if needed, connect to it
|
|
waitForStreamPushReady();
|
|
// pull the source setting from metadata
|
|
if (meta){strmSource = meta.getSource();}
|
|
|
|
if (!strmSource.size()){
|
|
FAIL_MSG("Push rejected - stream %s not configured or unavailable", streamName.c_str());
|
|
pushing = false;
|
|
return false;
|
|
}
|
|
if (strmSource.substr(0, 7) != "push://"){
|
|
FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)",
|
|
streamName.c_str(), strmSource.c_str());
|
|
pushing = false;
|
|
return false;
|
|
}
|
|
|
|
std::string source = strmSource.substr(7);
|
|
std::string IP = source.substr(0, source.find('@'));
|
|
|
|
/*LTS-START*/
|
|
std::string password;
|
|
if (source.find('@') != std::string::npos){
|
|
password = source.substr(source.find('@') + 1);
|
|
if (password != ""){
|
|
if (password == passwd){
|
|
INFO_MSG("Password accepted - ignoring IP settings.");
|
|
IP = "";
|
|
}else{
|
|
INFO_MSG("Password rejected - checking IP.");
|
|
if (IP == ""){IP = "deny-all.invalid";}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (Triggers::shouldTrigger("STREAM_PUSH", streamName)){
|
|
std::string payload = streamName + "\n" + getConnectedHost() + "\n" + capa["name"].asStringRef() + "\n" + reqUrl;
|
|
if (!Triggers::doTrigger("STREAM_PUSH", payload, streamName)){
|
|
WARN_MSG("Push from %s rejected by STREAM_PUSH trigger", getConnectedHost().c_str());
|
|
pushing = false;
|
|
return false;
|
|
}
|
|
}
|
|
/*LTS-END*/
|
|
|
|
if (IP != ""){
|
|
if (!myConn.isAddress(IP)){
|
|
WARN_MSG("Push from %s rejected; not whitelisted", getConnectedHost().c_str());
|
|
pushing = false;
|
|
return false;
|
|
}
|
|
}
|
|
initialize();
|
|
return true;
|
|
}
|
|
|
|
/// Attempts to wait for a stream to finish shutting down if it is, then restarts and reconnects.
|
|
void Output::waitForStreamPushReady(){
|
|
uint8_t streamStatus = Util::getStreamStatus(streamName);
|
|
MEDIUM_MSG("Current status for %s buffer is %u", streamName.c_str(), streamStatus);
|
|
if (streamStatus == STRMSTAT_READY){
|
|
reconnect();
|
|
std::set<size_t> vTracks = M.getValidTracks(true);
|
|
INFO_MSG("Stream already active (%zu valid tracks) - check if it's not shutting down...", vTracks.size());
|
|
uint64_t oneTime = 0;
|
|
uint64_t twoTime = 0;
|
|
for (std::set<size_t>::iterator it = vTracks.begin(); it != vTracks.end(); ++it){
|
|
if (M.getLastms(*it) > oneTime){oneTime = M.getLastms(*it);}
|
|
}
|
|
Util::wait(2000);
|
|
for (std::set<size_t>::iterator it = vTracks.begin(); it != vTracks.end(); ++it){
|
|
if (M.getLastms(*it) > twoTime){twoTime = M.getLastms(*it);}
|
|
}
|
|
if (twoTime <= oneTime+500){
|
|
disconnect();
|
|
INFO_MSG("Waiting for stream reset before attempting push input accept (%" PRIu64 " <= %" PRIu64 "+500)", twoTime, oneTime);
|
|
while (streamStatus != STRMSTAT_OFF && keepGoing()){
|
|
userSelect.clear();
|
|
Util::wait(1000);
|
|
streamStatus = Util::getStreamStatus(streamName);
|
|
}
|
|
reconnect();
|
|
}
|
|
}
|
|
while (((streamStatus != STRMSTAT_WAIT && streamStatus != STRMSTAT_READY) || !meta) && keepGoing()){
|
|
INFO_MSG("Waiting for %s buffer to be ready... (%u)", streamName.c_str(), streamStatus);
|
|
disconnect();
|
|
userSelect.clear();
|
|
Util::wait(1000);
|
|
streamStatus = Util::getStreamStatus(streamName);
|
|
if (streamStatus == STRMSTAT_OFF || streamStatus == STRMSTAT_WAIT || streamStatus == STRMSTAT_READY){
|
|
INFO_MSG("Reconnecting to %s buffer... (%u)", streamName.c_str(), streamStatus);
|
|
Util::wait(500);
|
|
reconnect();
|
|
streamStatus = Util::getStreamStatus(streamName);
|
|
}
|
|
}
|
|
if (streamStatus == STRMSTAT_READY || streamStatus == STRMSTAT_WAIT){reconnect();}
|
|
if (!meta){
|
|
onFail("Could not connect to stream data", true);
|
|
}
|
|
}
|
|
|
|
void Output::selectAllTracks(){
|
|
std::set<size_t> tracks = getSupportedTracks();
|
|
for (std::set<size_t>::iterator it = tracks.begin(); it != tracks.end(); it++){
|
|
userSelect[*it].reload(streamName, *it);
|
|
}
|
|
}
|
|
}// namespace Mist
|