Generalize DTSH header reading and writing; generalize input override prefixes; support external writer targets for pushing

This commit is contained in:
Thulinma 2023-02-17 01:13:29 +01:00
parent 2b18a414b4
commit 0f692233e8
26 changed files with 193 additions and 146 deletions

View file

@ -785,6 +785,28 @@ void Util::Config::addStandardPushCapabilities(JSON::Value &cap){
pp["append"]["format"] = "set_or_unset";
pp["append"]["sort"] = "bf";
pp["split"]["name"] = "Split interval";
pp["split"]["help"] = "Performs a gapless restart of the recording every this many seconds. Always aligns to the next keyframe after this duration, to ensure each recording is fully playable. When set to zero (the default) will not split at all.";
pp["split"]["type"] = "int";
pp["split"]["unit"] = "s";
pp["split"]["sort"] = "bh";
pp["m3u8"]["name"] = "Playlist path (relative to segments)";
pp["m3u8"]["help"] = "If set, will write a m3u8 playlist file for the segments to the given path (relative from the first segment path). When this parameter is used, at least one of the variables $segmentCounter or $currentMediaTime must be part of the segment path (to keep segments from overwriting each other). The \"Split interval\" parameter will default to 60 seconds when using this option.";
pp["m3u8"]["type"] = "string";
pp["m3u8"]["sort"] = "apa";
pp["targetAge"]["name"] = "Playlist target age";
pp["targetAge"]["help"] = "When writing a playlist, delete segment entries that are more than this many seconds old from the playlist (and, if possible, also delete said segments themselves). When set to 0 or left empty, does not delete.";
pp["targetAge"]["type"] = "int";
pp["targetAge"]["unit"] = "s";
pp["targetAge"]["sort"] = "apb";
pp["maxEntries"]["name"] = "Playlist max entries";
pp["maxEntries"]["help"] = "When writing a playlist, delete oldest segment entries once this entry count has been reached (and, if possible, also delete said segments themselves). When set to 0 or left empty, does not delete.";
pp["maxEntries"]["type"] = "int";
pp["maxEntries"]["sort"] = "apc";
pp["pushdelay"]["name"] = "Push delay";
pp["pushdelay"]["help"] = "Ensures the stream is always delayed by at least this many seconds. Internally overrides the \"realtime\" and \"start\" parameters";
pp["pushdelay"]["type"] = "int";
@ -793,12 +815,6 @@ void Util::Config::addStandardPushCapabilities(JSON::Value &cap){
pp["pushdelay"]["disable"].append("start");
pp["pushdelay"]["sort"] = "bg";
pp["split"]["name"] = "Split interval";
pp["split"]["help"] = "Performs a gapless restart of the recording every this may seconds. Always aligns to the next keyframe after this duration, to ensure each recording is fully playable";
pp["split"]["type"] = "int";
pp["split"]["unit"] = "s";
pp["split"]["sort"] = "bh";
pp["duration"]["name"] = "Duration of push";
pp["duration"]["help"] = "How much media time to push, in seconds. Internally overrides \"recstop\"";
pp["duration"]["type"] = "int";

View file

@ -12,6 +12,7 @@
#include "procs.h"
#include "shared_memory.h"
#include "socket.h"
#include "url.h"
#include "stream.h"
#include "triggers.h" //LTS
#include <semaphore.h>
@ -673,6 +674,25 @@ JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider)
for (unsigned int i = 0; i < input_size; ++i){
DTSC::Scan tmp_input = inputs.getIndice(i);
// if name prefix based match, always force 99 priority
if (tmp_input.getMember("name")){
std::string inPrefix = tmp_input.getMember("name").asString() + ":";
if (tmpFn.size() > inPrefix.size()){
Util::stringToLower(inPrefix);
std::string fnPrefix = tmpFn.substr(0, inPrefix.size());
Util::stringToLower(fnPrefix);
if (inPrefix == fnPrefix){
if (tmp_input.getMember("non-provider") && !isProvider){
noProviderNoPick = true;
continue;
}
curPrio = 99;
selected = true;
input = tmp_input;
}
}
}
// if match voor current stream && priority is hoger dan wat we al hebben
if (tmp_input.getMember("source_match") && curPrio < tmp_input.getMember("priority").asInt()){
if (tmp_input.getMember("source_match").getSize()){
@ -769,18 +789,43 @@ pid_t Util::startPush(const std::string &streamname, std::string &target, int de
std::string back = tar_match.substr(tar_match.find('*') + 1);
MEDIUM_MSG("Checking output %s: %s (%s)", outputs.getIndiceName(i).c_str(),
output.getMember("name").asString().c_str(), checkTarget.c_str());
if (checkTarget.substr(0, front.size()) == front &&
checkTarget.substr(checkTarget.size() - back.size()) == back){
output_bin = Util::getMyPath() + "MistOut" + output.getMember("name").asString();
break;
}
//Check for external writer support
if (front == "/" && back.size() && checkTarget.substr(checkTarget.size() - back.size()) == back){
HTTP::URL tUri(target);
// If it is a remote target, we might need to spawn an external binary
if (tUri.isLocalPath()){continue;}
// Read configured external writers
IPC::sharedPage extwriPage(EXTWRITERS, 0, false, false);
if (extwriPage.mapped){
Util::RelAccX extWri(extwriPage.mapped, false);
if (extWri.isReady()){
for (uint64_t i = 0; i < extWri.getEndPos(); i++){
Util::RelAccX protocols = Util::RelAccX(extWri.getPointer("protocols", i));
uint8_t protocolCount = protocols.getPresent();
JSON::Value protocolArray;
for (uint8_t idx = 0; idx < protocolCount; idx++){
if (tUri.protocol == protocols.getPointer("protocol", idx)){
output_bin = Util::getMyPath() + "MistOut" + output.getMember("name").asString();
break;
}
if (output_bin.size()){break;}
}
if (output_bin.size()){break;}
}
}
}
}
}
}
}
}
if (output_bin == ""){
if (!output_bin.size()){
FAIL_MSG("No output found for target %s, aborting push.", target.c_str());
return 0;
}

View file

@ -282,6 +282,17 @@ namespace Controller{
WARN_MSG("Input %s version mismatch (%s != " PACKAGE_VERSION ")", entryName.c_str(),
capabilities["inputs"][entryName]["version"].asStringRef().c_str());
capabilities["inputs"].removeMember(entryName);
}else{
JSON::Value & inRef = capabilities["inputs"][entryName];
if (inRef.isMember("source_match") && inRef.isMember("name")){
if (!inRef["source_match"].isArray()){
std::string m = inRef["source_match"].asString();
inRef["source_match"].append(m);
}
std::string n = inRef["name"].asString();
Util::stringToLower(n);
inRef["source_match"].append(n+":*");
}
}
}
}

View file

@ -420,33 +420,10 @@ namespace Controller{
return ret;
}
bool isMatch(const std::string &source, const std::string &match){
std::string front = match.substr(0, match.find('*'));
std::string back = match.substr(match.find('*') + 1);
// if the length of the source is smaller than the front and back matching parts together, it can never match
if (source.size() < front.size() + back.size()){return false;}
return (source.substr(0, front.size()) == front && source.substr(source.size() - back.size()) == back);
}
void checkParameters(JSON::Value &streamObj){
JSON::Value &inpt = Controller::capabilities["inputs"];
std::string match;
jsonForEach(inpt, it){
if ((*it)["source_match"].isArray()){
jsonForEach((*it)["source_match"], subIt){
if (isMatch(streamObj["source"].asStringRef(), (*subIt).asStringRef())){
match = (*it)["name"].asString();
}
}
}
if ((*it)["source_match"].isString()){
if (isMatch(streamObj["source"].asStringRef(), (*it)["source_match"].asStringRef())){
match = (*it)["name"].asString();
}
}
}
if (match != ""){
jsonForEach(inpt[match]["hardcoded"], it){streamObj[it.key()] = *it;}
JSON::Value in = Util::getInputBySource(streamObj["source"].asStringRef(), true);
if (in){
jsonForEach(in["hardcoded"], it){streamObj[it.key()] = *it;}
}
}

View file

@ -309,6 +309,45 @@ namespace Mist{
INFO_MSG("Input booting");
//Check if the input uses the name-based-override, and strip it
{
std::string input = config->getString("input");
std::string prefix = capa["name"].asStringRef() + ":";
Util::stringToLower(prefix);
if (input.size() > prefix.size()){
std::string match = input.substr(0, prefix.size());
Util::stringToLower(match);
if (prefix == match){
//We have a prefix match - make sure we don't _also_ have a proper source_match
bool source_match = false;
if (capa["source_match"].size()){
jsonForEach(capa["source_match"], it){
const std::string & source = it->asStringRef();
std::string front = source.substr(0, source.find('*'));
std::string back = source.substr(source.find('*') + 1);
if (input.size() > front.size()+back.size() && input.substr(0, front.size()) == front && input.substr(input.size() - back.size()) == back){
source_match = true;
break;
}
}
}else{
const std::string & source = capa["source_match"].asStringRef();
std::string front = source.substr(0, source.find('*'));
std::string back = source.substr(source.find('*') + 1);
if (input.size() > front.size()+back.size() && input.substr(0, front.size()) == front && input.substr(input.size() - back.size()) == back){
source_match = true;
}
}
//Only if no source_match, strip the prefix from the input string
if (!source_match){
config->getOption("input", true).append(input.substr(prefix.size()));
}
}
}
}
if (!checkArguments()){
FAIL_MSG("Setup failed - exiting");
return 0;
@ -505,16 +544,19 @@ namespace Mist{
Comms::sessionConfigCache();
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_BOOT;}
checkHeaderTimes(config->getString("input"));
//needHeader internally calls readExistingHeader which in turn attempts to read header cache
if (needHeader()){
uint64_t timer = Util::bootMS();
bool headerSuccess = readHeader();
if (!headerSuccess || (!M && needsLock())){
uint64_t timer = Util::getMicros();
if (!readHeader() || (!M && needsLock())){
FAIL_MSG("Reading header for '%s' failed.", config->getString("input").c_str());
return 0;
}
timer = Util::bootMS() - timer;
INFO_MSG("Read header in %" PRIu64 "ms (%zu tracks)", timer, M?M.trackCount():(size_t)0);
timer = Util::getMicros(timer);
INFO_MSG("Created header in %.3f ms (%zu tracks)", (double)timer/1000.0, M?M.trackCount():(size_t)0);
//Write header to file for caching purposes
M.toFile(config->getString("input") + ".dtsh");
}
postHeader();
if (config->getBool("headeronly")){return 0;}
if (M && M.getVod()){
meta.removeEmptyTracks();
@ -1089,10 +1131,12 @@ namespace Mist{
return r.str();
}
/// Attempts to create a header.
/// Returns true on success.
/// Default implementation fails and prints a warning.
bool Input::readHeader(){
INFO_MSG("Empty header created by default readHeader handler");
meta.reInit(streamName);
return true;
WARN_MSG("Default readHeader implementation called - this is not expected to happen");
return false;
}
void Input::parseHeader(){
@ -1476,25 +1520,18 @@ namespace Mist{
}
bool Input::readExistingHeader(){
if (config->getBool("realtime")){
meta.reInit("", config->getString("input") + ".dtsh");
if (!meta){return false;}
if (meta.version != DTSH_VERSION){
INFO_MSG("Updating wrong version header file from version %u to %u", meta.version, DTSH_VERSION);
return false;
}
return meta;
}
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, config->getString("streamname").c_str());
IPC::sharedPage sp(pageName, 0, false, false);
if (sp){
sp.close();
meta.reInit(config->getString("streamname"), false);
if (meta){
meta.setMaster(true);
INFO_MSG("Read existing header");
return true;
if (!config->getBool("realtime")){
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, config->getString("streamname").c_str());
IPC::sharedPage sp(pageName, 0, false, false);
if (sp){
sp.close();
meta.reInit(config->getString("streamname"), false);
if (meta){
meta.setMaster(true);
INFO_MSG("Read existing header");
return true;
}
}
}
// Try to read any existing DTSH file
@ -1509,7 +1546,7 @@ namespace Mist{
if (!fileSize){return false;}
DTSC::Packet pkt(scanBuf, fileSize, true);
HIGH_MSG("Retrieved header of %lu bytes", fileSize);
meta.reInit(streamName, pkt.getScan());
meta.reInit(config->getBool("realtime") ? "" : streamName, pkt.getScan());
if (meta.version != DTSH_VERSION){
INFO_MSG("Updating wrong version header file from version %u to %u", meta.version, DTSH_VERSION);

View file

@ -40,6 +40,7 @@ namespace Mist{
virtual bool checkArguments() = 0;
virtual bool readHeader();
virtual bool needHeader(){return !readExistingHeader();}
virtual void postHeader(){};
virtual bool preRun(){return true;}
virtual bool isThread(){return false;}
virtual bool isSingular(){return !config->getBool("realtime");}

View file

@ -204,8 +204,6 @@ namespace Mist{
if (!inFile.seek(0))
ERROR_MSG("Could not seek back to position 0!");
thisTime = 0;
M.toFile(config->getString("input") + ".dtsh");
return true;
}

View file

@ -143,7 +143,7 @@ namespace Mist{
meta.setType(idx, "audio");
meta.setRate(idx, strm->codecpar->sample_rate);
meta.setSize(idx, strm->codecpar->frame_size);
meta.setChannels(idx, strm->codecpar->channels);
meta.setChannels(idx, strm->codecpar->ch_layout.nb_channels);
}
}

View file

@ -9,7 +9,6 @@ namespace Mist{
protected:
bool checkArguments(){return false;};
bool readHeader(){return false;};
bool needHeader(){return false;};
};
}// namespace Mist

View file

@ -31,7 +31,6 @@ namespace Mist{
bool preRun();
bool checkArguments(){return true;}
void updateMeta();
bool readHeader(){return false;}
bool needHeader(){return false;}
void getNext(size_t idx = INVALID_TRACK_ID){};
void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID){};

View file

@ -237,39 +237,35 @@ namespace Mist{
bool inputDTSC::readHeader(){
if (!F){return false;}
if (!readExistingHeader()){
size_t moreHeader = 0;
do{
// read existing header from file here?
char hdr[8];
fseek(F, moreHeader, SEEK_SET);
if (fread(hdr, 8, 1, F) != 1){
FAIL_MSG("Could not read header @ bpos %zu", moreHeader);
return false;
}
if (memcmp(hdr, DTSC::Magic_Header, 4)){
FAIL_MSG("File does not have a DTSC header @ bpos %zu", moreHeader);
return false;
}
size_t pktLen = Bit::btohl(hdr + 4);
char *pkt = (char *)malloc(8 + pktLen * sizeof(char));
fseek(F, moreHeader, SEEK_SET);
if (fread(pkt, 8 + pktLen, 1, F) != 1){
free(pkt);
FAIL_MSG("Could not read packet @ bpos %zu", moreHeader);
}
DTSC::Scan S(pkt + 8, pktLen);
if (S.hasMember("moreheader") && S.getMember("moreheader").asInt()){
moreHeader = S.getMember("moreheader").asInt();
}else{
moreHeader = 0;
meta.reInit(isSingular() ? streamName : "", S);
}
size_t moreHeader = 0;
do{
char hdr[8];
fseek(F, moreHeader, SEEK_SET);
if (fread(hdr, 8, 1, F) != 1){
FAIL_MSG("Could not read header @ bpos %zu", moreHeader);
return false;
}
if (memcmp(hdr, DTSC::Magic_Header, 4)){
FAIL_MSG("File does not have a DTSC header @ bpos %zu", moreHeader);
return false;
}
size_t pktLen = Bit::btohl(hdr + 4);
char *pkt = (char *)malloc(8 + pktLen * sizeof(char));
fseek(F, moreHeader, SEEK_SET);
if (fread(pkt, 8 + pktLen, 1, F) != 1){
free(pkt);
}while (moreHeader);
}
FAIL_MSG("Could not read packet @ bpos %zu", moreHeader);
}
DTSC::Scan S(pkt + 8, pktLen);
if (S.hasMember("moreheader") && S.getMember("moreheader").asInt()){
moreHeader = S.getMember("moreheader").asInt();
}else{
moreHeader = 0;
meta.reInit(isSingular() ? streamName : "", S);
}
free(pkt);
}while (moreHeader);
return meta;
}

View file

@ -194,13 +194,6 @@ namespace Mist{
bool InputEBML::readExistingHeader(){
if (!Input::readExistingHeader()){return false;}
std::set<size_t> validTracks = M.getValidTracks();
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
if (M.getCodec(*it) == "PCMLE"){
meta.setCodec(*it, "PCM");
swapEndianness.insert(*it);
}
}
if (M.inputLocalVars.isMember("timescale")){
timeScale = ((double)M.inputLocalVars["timescale"].asInt()) / 1000000.0;
}
@ -213,8 +206,6 @@ namespace Mist{
bool InputEBML::readHeader(){
if (!inFile){return false;}
// Create header file from file
uint64_t bench = Util::getMicros();
if (!meta || (needsLock() && isSingular())){
meta.reInit(isSingular() ? streamName : "");
}
@ -462,12 +453,13 @@ namespace Mist{
}
meta.inputLocalVars["version"] = 2;
bench = Util::getMicros(bench);
INFO_MSG("Header generated in %" PRIu64 " ms", bench / 1000);
clearPredictors();
bufferedPacks = 0;
M.toFile(config->getString("input") + ".dtsh");
return true;
}
void InputEBML::postHeader(){
//Record PCMLE tracks as being PCM with swapped endianness
std::set<size_t> validTracks = M.getValidTracks();
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
if (M.getCodec(*it) == "PCMLE"){
@ -475,7 +467,6 @@ namespace Mist{
swapEndianness.insert(*it);
}
}
return true;
}
void InputEBML::fillPacket(packetData &C){

View file

@ -135,6 +135,7 @@ namespace Mist{
bool checkArguments();
bool preRun();
bool readHeader();
void postHeader();
bool readElement();
void getNext(size_t idx = INVALID_TRACK_ID);
void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID);

View file

@ -79,7 +79,6 @@ namespace Mist{
bool inputFLV::readHeader(){
if (!inFile){return false;}
if (readExistingHeader()){return true;}
meta.reInit(isSingular() ? streamName : "");
// Create header file from FLV data
Util::fseek(inFile, 13, SEEK_SET);
@ -107,7 +106,6 @@ namespace Mist{
FLV::Parse_Error = false;
ERROR_MSG("Stopping at FLV parse error @%" PRIu64 ": %s", lastBytePos, FLV::Error_Str.c_str());
}
M.toFile(config->getString("input") + ".dtsh");
Util::fseek(inFile, 13, SEEK_SET);
return true;
}

View file

@ -9,7 +9,6 @@ namespace Mist{
protected:
bool checkArguments(){return false;};
bool readHeader(){return false;};
bool needHeader(){return false;};
void getNext(size_t idx = INVALID_TRACK_ID){}
void seek(uint64_t time, size_t idx = INVALID_TRACK_ID){}

View file

@ -16,7 +16,6 @@ namespace Mist{
std::string spsInfo;
uint64_t frameCount;
// Empty defaults
bool readHeader(){return true;}
bool openStreamSource();
void closeStreamSource(){}
void parseStreamHeader();

View file

@ -763,7 +763,6 @@ namespace Mist{
bool inputHLS::readHeader(){
if (streamIsLive && !isLiveDVR){return true;}
if (readExistingHeader()){return true;}
// to analyse and extract data
TS::Packet packet;
char *data;
@ -891,10 +890,6 @@ namespace Mist{
thisMappingsR[JSON::Value(pidIt->first).asString()] = pidIt->second;
}
meta.inputLocalVars["pidMappingR"] = thisMappingsR;
INFO_MSG("write header file...");
M.toFile((config->getString("input") + ".dtsh").c_str());
return true;
}

View file

@ -87,7 +87,6 @@ namespace Mist{
}
curBytePos = ftell(inFile);
}
M.toFile(config->getString("input") + ".dtsh");
return true;
}

View file

@ -85,7 +85,6 @@ namespace Mist{
fseek(inFile, 0, SEEK_SET);
timestamp = 0;
M.toFile(config->getString("input") + ".dtsh");
return true;
}

View file

@ -112,7 +112,6 @@ namespace Mist{
capa["source_match"].append("https://*.mp4");
capa["source_match"].append("s3+http://*.mp4");
capa["source_match"].append("s3+https://*.mp4");
capa["source_match"].append("mp4:*");
capa["source_file"] = "$source";
capa["priority"] = 9;
capa["codecs"]["video"].append("HEVC");
@ -147,9 +146,7 @@ namespace Mist{
bool inputMP4::preRun(){
// open File
std::string inUrl = config->getString("input");
if (inUrl.size() > 4 && inUrl.substr(0, 4) == "mp4:"){inUrl.erase(0, 4);}
inFile.open(inUrl);
inFile.open(config->getString("input"));
if (!inFile){return false;}
if (!inFile.isSeekable()){
FAIL_MSG("MP4 input only supports seekable data sources, for now, and this source is not seekable: %s", config->getString("input").c_str());
@ -160,6 +157,13 @@ namespace Mist{
void inputMP4::dataCallback(const char *ptr, size_t size){readBuffer.append(ptr, size);}
bool inputMP4::needHeader(){
//Attempt to read cache, but force calling of the readHeader function anyway
bool r = Input::needHeader();
if (!r){r = !readHeader();}
return r;
}
bool inputMP4::readHeader(){
if (!inFile){
Util::logExitReason("Could not open input file");
@ -221,14 +225,13 @@ namespace Mist{
}
// See whether a separate header file exists.
if (readExistingHeader()){
// If we already read a cached header, we can exit here.
if (M){
bps = 0;
std::set<size_t> tracks = M.getValidTracks();
for (std::set<size_t>::iterator it = tracks.begin(); it != tracks.end(); it++){bps += M.getBps(*it);}
return true;
}
INFO_MSG("Not reading existing header");
meta.reInit(isSingular() ? streamName : "");
tNumber = 0;
@ -459,14 +462,6 @@ namespace Mist{
}
}
// outputting dtsh file
std::string inUrl = config->getString("input");
if (inUrl.size() > 4 && inUrl.substr(0, 4) == "mp4:"){inUrl.erase(0, 4);}
if (inUrl != "-" && HTTP::URL(inUrl).isLocalPath()){
M.toFile(inUrl + ".dtsh");
}else{
INFO_MSG("Skipping header write, as the source is not a local file");
}
bps = 0;
std::set<size_t> tracks = M.getValidTracks();
for (std::set<size_t>::iterator it = tracks.begin(); it != tracks.end(); it++){bps += M.getBps(*it);}

View file

@ -80,7 +80,7 @@ namespace Mist{
bool checkArguments();
bool preRun();
bool readHeader();
bool needHeader(){return true;}
bool needHeader();
void getNext(size_t idx = INVALID_TRACK_ID);
void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID);
void handleSeek(uint64_t seekTime, size_t idx);

View file

@ -222,8 +222,6 @@ namespace Mist{
meta.update(thisPacket);
getNext();
}
meta.toFile(config->getString("input") + ".dtsh");
return true;
}

View file

@ -26,7 +26,6 @@ namespace Mist{
// Private Functions
bool checkArguments();
bool needHeader(){return false;}
bool readHeader(){return true;}
bool openStreamSource();
void closeStreamSource();
void parseStreamHeader();

View file

@ -26,7 +26,6 @@ namespace Mist{
bool checkArguments();
// Overwrite default functions from input
bool needHeader(){return false;}
bool readHeader(){return true;}
// Force to stream > serve
bool needsLock(){return false;}
// Open connection with input

View file

@ -57,9 +57,6 @@ namespace Mist{
meta.update(thisPacket);
getNext();
}
// outputting dtsh file
M.toFile(config->getString("input") + ".dtsh");
return true;
}

View file

@ -413,7 +413,6 @@ namespace Mist{
}
fseek(inFile, 0, SEEK_SET);
meta.toFile(config->getString("input") + ".dtsh");
return true;
}