From 3e2a17ff93035b9b38f967ad980ad4cb188397a9 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 9 Nov 2022 10:35:07 +0100 Subject: [PATCH] Various metadata-related features and improvements: - Added support for new "NowMs" field that holds up to where no new packets are guaranteed to show up, in order to lower latency. - Added support for JSON tracks over all TS-based protocols (input and output) - Added support for AMF metadata conversion to JSON (RTMP/FLV input) - Fixed MP4 input subtitle tracks - Generalized websocket-based outputs to all support the same commands and run the same core logic - Added new "JSONLine" protocol that allows for generic direct line-by-line ingest of subtitles and/or JSON metadata tracks over a TCP socket or console standard input. --- CMakeLists.txt | 1 + lib/amf.cpp | 56 +++- lib/amf.h | 7 +- lib/dtsc.cpp | 31 +- lib/dtsc.h | 4 + lib/flv_tag.cpp | 26 +- lib/stream.cpp | 7 +- lib/stream.h | 1 + lib/ts_packet.cpp | 4 + lib/ts_stream.cpp | 19 ++ lib/ts_stream.h | 7 + lib/util.cpp | 5 + lib/util.h | 2 + src/input/input_buffer.cpp | 2 + src/input/input_mp4.cpp | 11 +- src/input/input_ts.cpp | 26 +- src/input/input_tsrist.cpp | 26 +- src/input/input_tssrt.cpp | 26 +- src/output/meson.build | 1 + src/output/output.cpp | 144 ++++----- src/output/output.h | 1 + src/output/output_http.cpp | 441 +++++++++++++++++++++++++++- src/output/output_http.h | 22 +- src/output/output_http_internal.cpp | 5 +- src/output/output_httpts.cpp | 1 + src/output/output_json.cpp | 40 ++- src/output/output_jsonline.cpp | 112 +++++++ src/output/output_jsonline.h | 26 ++ src/output/output_mp4.cpp | 328 +-------------------- src/output/output_mp4.h | 8 - src/output/output_rtmp.cpp | 20 +- src/output/output_srt.cpp | 38 ++- src/output/output_ts.cpp | 23 ++ src/output/output_ts_base.cpp | 6 + src/output/output_tsrist.cpp | 23 ++ src/output/output_tssrt.cpp | 23 ++ 36 files changed, 1054 insertions(+), 469 deletions(-) create mode 100644 src/output/output_jsonline.cpp create mode 100644 src/output/output_jsonline.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 78032193..2f888fa8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -551,6 +551,7 @@ endmacro() makeOutput(RTMP rtmp) makeOutput(DTSC dtsc) +makeOutput(JSONLine jsonline) makeOutput(OGG ogg http) makeOutput(FLV flv http) makeOutput(HTTPMinimalServer http_minimalserver http) diff --git a/lib/amf.cpp b/lib/amf.cpp index 54863c93..da29b8f9 100644 --- a/lib/amf.cpp +++ b/lib/amf.cpp @@ -6,7 +6,7 @@ #include /// Returns the std::string Indice for the current object, if available. /// Returns an empty string if no indice exists. -std::string AMF::Object::Indice(){ +std::string AMF::Object::Indice() const{ return myIndice; } @@ -190,6 +190,52 @@ std::string AMF::Object::Print(std::string indent){ return st.str(); }// print +JSON::Value AMF::Object::toJSON() const{ + switch (myType){ + case AMF::AMF0_NUMBER: + case AMF::AMF0_DATE: + case AMF::AMF0_REFERENCE: + return numval; + case AMF::AMF0_BOOL: + return (bool)numval; + case AMF::AMF0_STRING: + case AMF::AMF0_LONGSTRING: + case AMF::AMF0_XMLDOC: // is always a longstring + return strval; + case AMF::AMF0_TYPED_OBJ: // is an object, with the classname first + case AMF::AMF0_OBJECT: + case AMF::AMF0_ECMA_ARRAY:{ + JSON::Value ret; + if (contents.size() > 0){ + for (std::vector::const_iterator it = contents.begin(); it != contents.end(); it++){ + ret[it->Indice()] = it->toJSON(); + } + } + return ret; + } + case AMF::AMF0_MOVIECLIP: + case AMF::AMF0_OBJ_END: + case AMF::AMF0_UPGRADE: + case AMF::AMF0_NULL: + case AMF::AMF0_UNDEFINED: + case AMF::AMF0_RECORDSET: + case AMF::AMF0_UNSUPPORTED: + // no data to add + return JSON::Value(); + case AMF::AMF0_DDV_CONTAINER: // only send contents + case AMF::AMF0_STRICT_ARRAY:{ + JSON::Value ret; + if (contents.size() > 0){ + for (std::vector::const_iterator it = contents.begin(); it != contents.end(); it++){ + ret.append(it->toJSON()); + } + } + return ret; + } + } + return JSON::Value(); +} + /// Packs the AMF object to a std::string for transfer over the network. /// If the object is a container type, this function will call itself recursively and contain all /// contents. Tip: When sending multiple AMF objects in one go, put them in a single @@ -489,7 +535,7 @@ AMF::Object AMF::parse(std::string data){ /// Returns the std::string Indice for the current object, if available. /// Returns an empty string if no indice exists. -std::string AMF::Object3::Indice(){ +std::string AMF::Object3::Indice() const{ return myIndice; } @@ -695,10 +741,16 @@ std::string AMF::Object3::Print(std::string indent){ /// contents. Tip: When sending multiple AMF objects in one go, put them in a single /// AMF::AMF0_DDV_CONTAINER for easy transfer. std::string AMF::Object3::Pack(){ + /// \TODO Implement std::string r = ""; return r; }// pack +JSON::Value AMF::Object3::toJSON() const{ + /// \TODO Implement + return JSON::Value(); +} + /// Parses a single AMF3 type - used recursively by the AMF::parse3() functions. /// This function updates i every call with the new position in the data. /// \param data The raw data to parse. diff --git a/lib/amf.h b/lib/amf.h index 0777dae9..8fad797f 100644 --- a/lib/amf.h +++ b/lib/amf.h @@ -5,6 +5,7 @@ #include #include #include +#include "json.h" /// Holds all AMF parsing and creation related functions and classes. namespace AMF{ @@ -55,7 +56,7 @@ namespace AMF{ /// container type. class Object{ public: - std::string Indice(); + std::string Indice() const; obj0type GetType(); double NumValue(); std::string StrValue(); @@ -73,6 +74,7 @@ namespace AMF{ Object(std::string indice, obj0type setType = AMF0_OBJECT); std::string Print(std::string indent = ""); std::string Pack(); + JSON::Value toJSON() const; protected: std::string myIndice; ///< Holds this objects indice, if any. @@ -95,7 +97,7 @@ namespace AMF{ /// container type. class Object3{ public: - std::string Indice(); + std::string Indice() const; obj3type GetType(); double DblValue(); int IntValue(); @@ -114,6 +116,7 @@ namespace AMF{ Object3(std::string indice, obj3type setType = AMF3_OBJECT); std::string Print(std::string indent = ""); std::string Pack(); + JSON::Value toJSON() const; protected: std::string myIndice; ///< Holds this objects indice, if any. diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index bfd887d6..6c540fef 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -991,13 +991,13 @@ namespace DTSC{ setBootMsOffset(src.getMember("unixzero").asInt() - Util::unixMS() + Util::bootMS()); }else{ MEDIUM_MSG("No member \'unixzero\' found in DTSC::Scan. Calculating locally."); - int64_t lastMs = 0; + int64_t nowMs = 0; for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ - if (it->second.track.getInt(it->second.trackLastmsField) > lastMs){ - lastMs = it->second.track.getInt(it->second.trackLastmsField); + if (it->second.track.getInt(it->second.trackNowmsField) > nowMs){ + nowMs = it->second.track.getInt(it->second.trackNowmsField); } } - setBootMsOffset(Util::bootMS() - lastMs); + setBootMsOffset(Util::bootMS() - nowMs); } } @@ -1243,6 +1243,9 @@ namespace DTSC{ t.trackCodecField = t.track.getFieldData("codec"); t.trackFirstmsField = t.track.getFieldData("firstms"); t.trackLastmsField = t.track.getFieldData("lastms"); + t.trackNowmsField = t.track.getFieldData("nowms"); + // If there is no nowMs field, fall back to the lastMs field instead ( = old behaviour). + if (!t.trackNowmsField){t.trackNowmsField = t.trackLastmsField;} t.trackBpsField = t.track.getFieldData("bps"); t.trackMaxbpsField = t.track.getFieldData("maxbps"); t.trackLangField = t.track.getFieldData("lang"); @@ -1332,6 +1335,9 @@ namespace DTSC{ t.trackCodecField = t.track.getFieldData("codec"); t.trackFirstmsField = t.track.getFieldData("firstms"); t.trackLastmsField = t.track.getFieldData("lastms"); + t.trackNowmsField = t.track.getFieldData("nowms"); + // If there is no nowMs field, fall back to the lastMs field instead ( = old behaviour). + if (!t.trackNowmsField){t.trackNowmsField = t.trackLastmsField;} t.trackBpsField = t.track.getFieldData("bps"); t.trackMaxbpsField = t.track.getFieldData("maxbps"); t.trackLangField = t.track.getFieldData("lang"); @@ -1542,6 +1548,11 @@ namespace DTSC{ t.track.setString(t.trackCodecField, origAccess.getPointer("codec")); t.track.setInt(t.trackFirstmsField, origAccess.getInt("firstms")); t.track.setInt(t.trackLastmsField, origAccess.getInt("lastms")); + if (origAccess.hasField("nowms")){ + t.track.setInt(t.trackNowmsField, origAccess.getInt("nowms")); + }else{ + t.track.setInt(t.trackNowmsField, origAccess.getInt("lastms")); + } t.track.setInt(t.trackBpsField, origAccess.getInt("bps")); t.track.setInt(t.trackMaxbpsField, origAccess.getInt("maxbps")); t.track.setString(t.trackLangField, origAccess.getPointer("lang")); @@ -1807,6 +1818,9 @@ namespace DTSC{ t.trackCodecField = t.track.getFieldData("codec"); t.trackFirstmsField = t.track.getFieldData("firstms"); t.trackLastmsField = t.track.getFieldData("lastms"); + t.trackNowmsField = t.track.getFieldData("nowms"); + // If there is no nowMs field, fall back to the lastMs field instead ( = old behaviour). + if (!t.trackNowmsField){t.trackNowmsField = t.trackLastmsField;} t.trackBpsField = t.track.getFieldData("bps"); t.trackMaxbpsField = t.track.getFieldData("maxbps"); t.trackLangField = t.track.getFieldData("lang"); @@ -1999,6 +2013,15 @@ namespace DTSC{ return t.track.getInt(t.trackLastmsField); } + void Meta::setNowms(size_t trackIdx, uint64_t nowms){ + DTSC::Track &t = tracks.at(trackIdx); + t.track.setInt(t.trackNowmsField, nowms); + } + uint64_t Meta::getNowms(size_t trackIdx) const{ + const DTSC::Track &t = tracks.find(trackIdx)->second; + return t.track.getInt(t.trackNowmsField); + } + uint64_t Meta::getDuration(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackLastmsField) - t.track.getInt(t.trackFirstmsField); diff --git a/lib/dtsc.h b/lib/dtsc.h index 9a2dbd7c..64d6e979 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -238,6 +238,7 @@ namespace DTSC{ Util::RelAccXFieldData trackCodecField; Util::RelAccXFieldData trackFirstmsField; Util::RelAccXFieldData trackLastmsField; + Util::RelAccXFieldData trackNowmsField; Util::RelAccXFieldData trackBpsField; Util::RelAccXFieldData trackMaxbpsField; Util::RelAccXFieldData trackLangField; @@ -375,6 +376,9 @@ namespace DTSC{ void setLastms(size_t trackIdx, uint64_t lastms); uint64_t getLastms(size_t trackIdx) const; + void setNowms(size_t trackIdx, uint64_t nowms); + uint64_t getNowms(size_t trackIdx) const; + uint64_t getDuration(size_t trackIdx) const; void setBps(size_t trackIdx, uint64_t bps); diff --git a/lib/flv_tag.cpp b/lib/flv_tag.cpp index 780c6c5a..681bcb7e 100644 --- a/lib/flv_tag.cpp +++ b/lib/flv_tag.cpp @@ -825,7 +825,21 @@ void FLV::Tag::toMeta(DTSC::Meta &meta, AMF::Object &amf_storage, size_t &reTrac case 0x12: trackType = "meta"; break; // meta } + if (meta.getVod() && reTrack == INVALID_TRACK_ID){ + reTrack = meta.trackIDToIndex(getTrackID(), getpid()); + } + + if (reTrack == INVALID_TRACK_ID){ + reTrack = meta.addTrack(); + meta.setID(reTrack, getTrackID()); + if (targetParams.count("lang")){ + meta.setLang(reTrack, targetParams.at("lang")); + } + } + if (data[0] == 0x12){ + meta.setType(reTrack, "meta"); + meta.setCodec(reTrack, "JSON"); AMF::Object meta_in = AMF::parse((unsigned char *)data + 11, len - 15); AMF::Object *tmp = 0; if (meta_in.getContentP(1) && meta_in.getContentP(0) && (meta_in.getContentP(0)->StrValue() == "onMetaData")){ @@ -839,18 +853,6 @@ void FLV::Tag::toMeta(DTSC::Meta &meta, AMF::Object &amf_storage, size_t &reTrac return; } - if (meta.getVod() && reTrack == INVALID_TRACK_ID){ - reTrack = meta.trackIDToIndex(getTrackID(), getpid()); - } - - if (reTrack == INVALID_TRACK_ID){ - reTrack = meta.addTrack(); - meta.setID(reTrack, getTrackID()); - if (targetParams.count("lang")){ - meta.setLang(reTrack, targetParams.at("lang")); - } - } - std::string codec = meta.getCodec(reTrack); if (data[0] == 0x08 && (codec == "" || codec != getAudioCodec() || (needsInitData() && isInitData()))){ char audiodata = data[11]; diff --git a/lib/stream.cpp b/lib/stream.cpp index e7fabdc1..1a3afe98 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -1274,7 +1274,7 @@ std::set Util::pickTracks(const DTSC::Meta &M, const std::set tr /// It is necessary to follow up with a selectDefaultTracks() call to strip unsupported /// codecs/combinations. std::set Util::findTracks(const DTSC::Meta &M, const JSON::Value &capa, const std::string &trackType, const std::string &trackVal, const std::string &UA){ - std::set validTracks = capa?getSupportedTracks(M, capa, "", UA):M.getValidTracks(); + std::set validTracks = capa?getSupportedTracks(M, capa, "", UA):M.getValidTracks(true); return pickTracks(M, validTracks, trackType, trackVal); } @@ -1288,7 +1288,7 @@ std::set Util::wouldSelect(const DTSC::Meta &M, const std::string &track std::set Util::getSupportedTracks(const DTSC::Meta &M, const JSON::Value &capa, const std::string &type, const std::string &UA){ - std::set validTracks = M.getValidTracks(); + std::set validTracks = M.getValidTracks(true); uint64_t maxLastMs = 0; std::set toRemove; for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ @@ -1415,7 +1415,7 @@ std::set Util::wouldSelect(const DTSC::Meta &M, const std::map validTracks = M.getValidTracks(); + std::set validTracks = M.getValidTracks(true); if (capa){validTracks = getSupportedTracks(M, capa);} // check which tracks don't actually exist @@ -1624,6 +1624,7 @@ std::set Util::wouldSelect(const DTSC::Meta &M, const std::map offset+1){ @@ -1058,6 +1072,11 @@ namespace TS{ codec = "AC3"; size = 16; }break; + case JSON:{ + addNewTrack = true; + type = "meta"; + codec = "JSON"; + }break; case OPUS:{ addNewTrack = true; type = "audio"; diff --git a/lib/ts_stream.h b/lib/ts_stream.h index 023dcd42..01f5c3c6 100644 --- a/lib/ts_stream.h +++ b/lib/ts_stream.h @@ -22,6 +22,11 @@ namespace TS{ OPUS = 0x060001 }; + enum rawDataType{ + NONE = 0, + JSON + }; + class ADTSRemainder{ private: char *data; @@ -75,9 +80,11 @@ namespace TS{ std::set getActiveTracks(); void setLastms(size_t tid, uint64_t timestamp); + void setRawDataParser(rawDataType parser); private: uint64_t lastPAT; + rawDataType rParser; ProgramAssociationTable associationTable; std::map remainders; diff --git a/lib/util.cpp b/lib/util.cpp index 9e317b81..4c561069 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -840,6 +840,11 @@ namespace Util{ return true; } + /// Returns true if the given field exists. + bool RelAccX::hasField(const std::string & name) const{ + return (fields.find(name) != fields.end()); + } + /// Returns the (max) size of the given field. /// For string types, returns the exact size excluding terminating null byte. /// For other types, returns the maximum size possible. diff --git a/lib/util.h b/lib/util.h index 0970807e..d0332228 100644 --- a/lib/util.h +++ b/lib/util.h @@ -86,6 +86,7 @@ namespace Util{ size = s; offset = o; } + operator bool() const {return offset;} }; #define RAX_NESTED 0x01 @@ -158,6 +159,7 @@ namespace Util{ bool isExit() const; bool isReload() const; bool isRecordAvailable(uint64_t recordNo) const; + bool hasField(const std::string &name) const; uint32_t getSize(const std::string &name, uint64_t recordNo = 0) const; char *getPointer(const std::string &name, uint64_t recordNo = 0) const; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index c974e4e5..eb7dd2a5 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -349,6 +349,8 @@ namespace Mist{ } for (std::set::iterator idx = tracks.begin(); idx != tracks.end(); idx++){ size_t i = *idx; + //Don't delete idle metadata tracks + if (M.getType(i) == "meta"){continue;} uint64_t lastUp = M.getLastUpdated(i); //Prevent issues when getLastUpdated > current time. This can happen if the second rolls over exactly during this loop. if (lastUp >= time){continue;} diff --git a/src/input/input_mp4.cpp b/src/input/input_mp4.cpp index 342c1612..8f1ae5e0 100644 --- a/src/input/input_mp4.cpp +++ b/src/input/input_mp4.cpp @@ -478,7 +478,7 @@ namespace Mist{ packSendSize = 24 + (BsetPart.timeOffset ? 17 : 0) + (BsetPart.bpos ? 15 : 0) + 19 + stszBox.getEntrySize(stszIndex) + 11 - 2 + 19; meta.update(BsetPart.time, BsetPart.timeOffset, tNumber, - stszBox.getEntrySize(stszIndex) - 2, BsetPart.bpos, true, packSendSize); + stszBox.getEntrySize(stszIndex) - 2, BsetPart.bpos+2, true, packSendSize); } }else{ meta.update(BsetPart.time, BsetPart.timeOffset, tNumber, @@ -558,16 +558,11 @@ namespace Mist{ } if (M.getCodec(curPart.trackID) == "subtitle"){ - unsigned int txtLen = Bit::btohs(readBuffer + (curPart.bpos-readPos)); - if (!txtLen && false){ - curPart.index++; - return getNext(idx); - } static JSON::Value thisPack; thisPack.null(); thisPack["trackid"] = (uint64_t)curPart.trackID; thisPack["bpos"] = curPart.bpos; //(long long)fileSource.tellg(); - thisPack["data"] = std::string(readBuffer + (curPart.bpos-readPos) + 2, txtLen); + thisPack["data"] = std::string(readBuffer + (curPart.bpos-readPos), curPart.size); thisPack["time"] = curPart.time; if (curPart.duration){thisPack["duration"] = curPart.duration;} thisPack["keyframe"] = true; @@ -583,6 +578,7 @@ namespace Mist{ curPart.index++; if (curPart.index < headerData(M.getID(curPart.trackID)).size()){ headerData(M.getID(curPart.trackID)).getPart(curPart.index, curPart.bpos); + if (M.getCodec(curPart.trackID) == "subtitle"){curPart.bpos += 2;} curPart.size = parts.getSize(curPart.index); curPart.offset = parts.getOffset(curPart.index); curPart.time = M.getPartTime(curPart.index, thisIdx); @@ -616,6 +612,7 @@ namespace Mist{ for (size_t i = 0; i < headerDataSize; i++){ thisHeader.getPart(i, addPart.bpos); + if (M.getCodec(idx) == "subtitle"){addPart.bpos += 2;} addPart.size = parts.getSize(i); addPart.offset = parts.getOffset(i); addPart.time = M.getPartTime(i, idx); diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index a94d2e9e..23d7f9e6 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -243,6 +243,25 @@ namespace Mist{ capa["optional"]["segmentsize"]["type"] = "uint"; capa["optional"]["segmentsize"]["default"] = 1900; + capa["optional"]["datatrack"]["name"] = "MPEG Data track parser"; + capa["optional"]["datatrack"]["help"] = "Which parser to use for data tracks"; + capa["optional"]["datatrack"]["type"] = "select"; + capa["optional"]["datatrack"]["option"] = "--datatrack"; + capa["optional"]["datatrack"]["short"] = "D"; + capa["optional"]["datatrack"]["default"] = ""; + capa["optional"]["datatrack"]["select"][0u][0u] = ""; + capa["optional"]["datatrack"]["select"][0u][1u] = "None / disabled"; + capa["optional"]["datatrack"]["select"][1u][0u] = "json"; + capa["optional"]["datatrack"]["select"][1u][1u] = "2b size-prepended JSON"; + + JSON::Value option; + option["long"] = "datatrack"; + option["short"] = "D"; + option["arg"] = "string"; + option["default"] = ""; + option["help"] = "Which parser to use for data tracks"; + config->addOption("datatrack", option); + capa["optional"]["fallback_stream"]["name"] = "Fallback stream"; capa["optional"]["fallback_stream"]["help"] = "Alternative stream to load for playback when there is no active broadcast"; @@ -253,7 +272,7 @@ namespace Mist{ capa["optional"]["raw"]["help"] = "Enable raw MPEG-TS passthrough mode"; capa["optional"]["raw"]["option"] = "--raw"; - JSON::Value option; + option.null(); option["long"] = "raw"; option["short"] = "R"; option["help"] = "Enable raw MPEG-TS passthrough mode"; @@ -277,6 +296,11 @@ namespace Mist{ config->getOption("input", true).append("ts-exec:srt-live-transmit " + srtUrl.getUrl() + " file://con"); INFO_MSG("Rewriting SRT source '%s' to '%s'", source.c_str(), config->getString("input").c_str()); } + if (config->getString("datatrack") == "json"){ + liveStream.setRawDataParser(TS::JSON); + tsStream.setRawDataParser(TS::JSON); + } + // We call preRun early and, if successful, close the opened reader. // This is to ensure we have udpMode/rawMode/standAlone all set properly before the first call to needsLock. // The reader must be closed so that the angel process does not have a reader open. diff --git a/src/input/input_tsrist.cpp b/src/input/input_tsrist.cpp index ef079338..f9423693 100644 --- a/src/input/input_tsrist.cpp +++ b/src/input/input_tsrist.cpp @@ -146,6 +146,25 @@ namespace Mist{ option["help"] = "Enable raw MPEG-TS passthrough mode"; config->addOption("raw", option); + capa["optional"]["datatrack"]["name"] = "MPEG Data track parser"; + capa["optional"]["datatrack"]["help"] = "Which parser to use for data tracks"; + capa["optional"]["datatrack"]["type"] = "select"; + capa["optional"]["datatrack"]["option"] = "--datatrack"; + capa["optional"]["datatrack"]["short"] = "D"; + capa["optional"]["datatrack"]["default"] = ""; + capa["optional"]["datatrack"]["select"][0u][0u] = ""; + capa["optional"]["datatrack"]["select"][0u][1u] = "None / disabled"; + capa["optional"]["datatrack"]["select"][1u][0u] = "json"; + capa["optional"]["datatrack"]["select"][1u][1u] = "2b size-prepended JSON"; + + option.null(); + option["long"] = "datatrack"; + option["short"] = "D"; + option["arg"] = "string"; + option["default"] = ""; + option["help"] = "Which parser to use for data tracks"; + config->addOption("datatrack", option); + lastTimeStamp = 0; timeStampOffset = 0; receiver_ctx = 0; @@ -156,7 +175,12 @@ namespace Mist{ rist_destroy(receiver_ctx); } - bool inputTSRIST::checkArguments(){return true;} + bool inputTSRIST::checkArguments(){ + if (config->getString("datatrack") == "json"){ + tsStream.setRawDataParser(TS::JSON); + } + return true; + } /// Live Setup of SRT Input. Runs only if we are the "main" thread bool inputTSRIST::preRun(){ diff --git a/src/input/input_tssrt.cpp b/src/input/input_tssrt.cpp index bd142ab9..def760de 100644 --- a/src/input/input_tssrt.cpp +++ b/src/input/input_tssrt.cpp @@ -117,6 +117,25 @@ namespace Mist{ option["help"] = "Enable raw MPEG-TS passthrough mode"; config->addOption("raw", option); + capa["optional"]["datatrack"]["name"] = "MPEG Data track parser"; + capa["optional"]["datatrack"]["help"] = "Which parser to use for data tracks"; + capa["optional"]["datatrack"]["type"] = "select"; + capa["optional"]["datatrack"]["option"] = "--datatrack"; + capa["optional"]["datatrack"]["short"] = "D"; + capa["optional"]["datatrack"]["default"] = ""; + capa["optional"]["datatrack"]["select"][0u][0u] = ""; + capa["optional"]["datatrack"]["select"][0u][1u] = "None / disabled"; + capa["optional"]["datatrack"]["select"][1u][0u] = "json"; + capa["optional"]["datatrack"]["select"][1u][1u] = "2b size-prepended JSON"; + + option.null(); + option["long"] = "datatrack"; + option["short"] = "D"; + option["arg"] = "string"; + option["default"] = ""; + option["help"] = "Which parser to use for data tracks"; + config->addOption("datatrack", option); + // Setup if we are called form with a thread for push-based input. if (s.connected()){ srtConn = s; @@ -140,7 +159,12 @@ namespace Mist{ inputTSSRT::~inputTSSRT(){} - bool inputTSSRT::checkArguments(){return true;} + bool inputTSSRT::checkArguments(){ + if (config->getString("datatrack") == "json"){ + tsStream.setRawDataParser(TS::JSON); + } + return true; + } /// Live Setup of SRT Input. Runs only if we are the "main" thread bool inputTSSRT::preRun(){ diff --git a/src/output/meson.build b/src/output/meson.build index 6b4a2d65..fe781353 100644 --- a/src/output/meson.build +++ b/src/output/meson.build @@ -21,6 +21,7 @@ outputs = [ {'name' : 'WAV', 'format' : 'wav', 'extra': ['http']}, {'name' : 'SDP', 'format' : 'sdp', 'extra': ['http']}, {'name' : 'HTTP', 'format' : 'http_internal', 'extra': ['http','embed']}, + {'name' : 'JSONLine', 'format' : 'jsonline'}, ] if usessl diff --git a/src/output/output.cpp b/src/output/output.cpp index bec82727..6c7a8fcf 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -94,7 +94,8 @@ namespace Mist{ dataWaitTimeout = 2500; pushing = false; recursingSync = false; - firstTime = 0; + firstTime = Util::bootMS(); + thisTime = 0; firstPacketTime = 0xFFFFFFFFFFFFFFFFull; lastPacketTime = 0; tkn = ""; @@ -247,7 +248,7 @@ namespace Mist{ size_t mainTrack = getMainSelectedTrack(); if (mainTrack != INVALID_TRACK_ID){ DTSC::Keys keys(M.keys(mainTrack)); - if (keys.getValidCount() >= minTracks || M.getLastms(mainTrack) - M.getFirstms(mainTrack) > minMs){ + if (keys.getValidCount() >= minTracks || M.getNowms(mainTrack) - M.getFirstms(mainTrack) > minMs){ return true; } HIGH_MSG("NOT READY YET (%zu tracks, main track: %zu, with %zu keys)", @@ -393,7 +394,7 @@ namespace Mist{ } bool autoSeek = buffer.size(); - uint64_t seekTarget = buffer.getSyncMode()?currentTime():0; + uint64_t seekTarget = buffer.getSyncMode()?thisTime:0; std::set newSelects = Util::wouldSelect(M, targetParams, capa, UA, autoSeek ? seekTarget : 0); @@ -401,7 +402,7 @@ namespace Mist{ std::set toRemove; for (std::set::iterator it = newSelects.begin(); it != newSelects.end(); it++){ // autoSeeking and target not in bounds? Drop it too. - if (M.getLastms(*it) < std::max(seekTarget, (uint64_t)6000lu) - 6000){ + if (M.getNowms(*it) < std::max(seekTarget, (uint64_t)6000lu) - 6000){ toRemove.insert(*it); } } @@ -493,7 +494,7 @@ namespace Mist{ //Abort if there are no keys if (!keys.getValidCount()){return 0;} //Get the key for the current time - size_t keyNum = M.getKeyNumForTime(trk, lastPacketTime); + size_t keyNum = M.getKeyNumForTime(trk, thisTime); if (keyNum == INVALID_KEY_NUM){return 0;} if (keys.getEndValid() <= keyNum+1){return 0;} //Return the next key @@ -720,6 +721,13 @@ namespace Mist{ return buffer.begin()->time; } + /// Return the intended target current time of the media buffer (as opposed to actual) + /// This takes into account the current playback speed as well as the maxSkipAhead setting. + uint64_t Output::targetTime(){ + if (!realTime){return currentTime();} + return (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead); + } + /// Return the start time of the selected tracks. /// Returns the start time of earliest track if nothing is selected. /// Returns zero if no tracks exist. @@ -836,16 +844,16 @@ namespace Mist{ } HIGH_MSG("Seeking for pos %" PRIu64, pos); - if (meta.getLive() && meta.getLastms(tid) < pos){ + if (meta.getLive() && meta.getNowms(tid) < pos){ unsigned int maxTime = 0; - while (meta.getLastms(tid) < pos && myConn && ++maxTime <= 20 && keepGoing()){ + while (meta.getNowms(tid) < pos && myConn && ++maxTime <= 20 && keepGoing()){ Util::wait(500); stats(); } } - if (meta.getLastms(tid) < pos){ + if (meta.getNowms(tid) < pos){ WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: past end of track (= %" PRIu64 "ms).", - pos, tid, meta.getLastms(tid)); + pos, tid, meta.getNowms(tid)); userSelect.erase(tid); return false; } @@ -884,46 +892,24 @@ namespace Mist{ tmpPack.reInit(curPage[tid].mapped + tmp.offset, 0, true); tmp.time = tmpPack.getTime(); char *mpd = curPage[tid].mapped; + uint64_t nowMs = M.getNowms(tid); while (tmp.time < pos && tmpPack){ tmp.offset += tmpPack.getDataLen(); tmpPack.reInit(mpd + tmp.offset, 0, true); tmp.time = tmpPack.getTime(); } if (tmpPack){ + tmp.ghostPacket = false; HIGH_MSG("Sought to time %" PRIu64 " in %s", tmp.time, curPage[tid].name.c_str()); tmp.partIndex = M.getPartIndex(tmpPack.getTime(), tmp.tid); buffer.insert(tmp); return true; } - // don't print anything for empty packets - not sign of corruption, just unfinished stream. - if (curPage[tid].mapped[tmp.offset] != 0){ - //There's a chance the packet header was written in between this check and the previous. - //Let's check one more time before aborting - tmpPack.reInit(mpd + tmp.offset, 0, true); - tmp.time = tmpPack.getTime(); - if (tmpPack){ - HIGH_MSG("Sought to time %" PRIu64 " in %s", tmp.time, curPage[tid].name.c_str()); - tmp.partIndex = M.getPartIndex(tmpPack.getTime(), tmp.tid); - buffer.insert(tmp); - return true; - } - FAIL_MSG("Noes! Couldn't find packet on track %zu because of some kind of corruption error " - "or somesuch.", - tid); - return false; - } - VERYHIGH_MSG("Track %zu no data (key %" PRIu32 " @ %" PRIu64 ") - waiting...", tid, - keyNum + (getNextKey ? 1 : 0), tmp.offset); - uint32_t i = 0; - while (meta.getVod() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ - Util::wait(100 * i); - stats(); - } - if (curPage[tid].mapped[tmp.offset]){return seek(tid, pos, getNextKey);} - FAIL_MSG("Track %zu no data (key %" PRIu32 "@%" PRIu64 ", page %s, time %" PRIu64 " -> %" PRIu64 ", next=%" PRIu64 ") - timeout", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset, curPage[tid].name.c_str(), pos, actualKeyTime, keys.getTime(keyNum+1)); - userSelect.erase(tid); - firstTime = Util::bootMS() - (buffer.begin()->time * realTime / 1000); - return false; + tmp.partIndex = M.getPartIndex(nowMs, tmp.tid); + tmp.ghostPacket = true; + tmp.time = nowMs; + buffer.insert(tmp); + return true; } /// This function decides where in the stream initial playback starts. @@ -948,7 +934,7 @@ namespace Mist{ bool good = true; // check if all tracks have data for this point in time for (std::map::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){ - if (meta.getLastms(ti->first) < seekPos + needsLookAhead){ + if (meta.getNowms(ti->first) < seekPos + needsLookAhead){ good = false; break; } @@ -957,15 +943,15 @@ namespace Mist{ HIGH_MSG("Skipping track %zu, not in tracks", ti->first); continue; }// ignore missing tracks - if (M.getLastms(ti->first) < seekPos + needsLookAhead + M.getMinKeepAway(ti->first)){ + if (M.getNowms(ti->first) < seekPos + needsLookAhead + M.getMinKeepAway(ti->first)){ good = false; break; } - if (meta.getLastms(ti->first) == M.getFirstms(ti->first)){ + if (meta.getNowms(ti->first) == M.getFirstms(ti->first)){ HIGH_MSG("Skipping track %zu, last equals first", ti->first); continue; }// ignore point-tracks - if (meta.getLastms(ti->first) < seekPos){ + if (meta.getNowms(ti->first) < seekPos){ good = false; break; } @@ -1099,7 +1085,7 @@ namespace Mist{ if (M.getLive() && (targetParams.count("startunix") || targetParams.count("stopunix"))){ uint64_t unixStreamBegin = Util::epoch() - endTime()/1000; size_t mainTrack = getMainSelectedTrack(); - int64_t streamAvail = M.getLastms(mainTrack); + int64_t streamAvail = M.getNowms(mainTrack); if (targetParams.count("startunix")){ int64_t startUnix = atoll(targetParams["startunix"].c_str()); if (startUnix < 0){ @@ -1116,8 +1102,7 @@ namespace Mist{ } } if (startUnix < unixStreamBegin){ - WARN_MSG("Start time is earlier than stream begin - starting earliest possible"); - WARN_MSG("%" PRId64 " < %" PRId64, startUnix, unixStreamBegin); + WARN_MSG("Start time (%" PRId64 ") is earlier than stream begin (%" PRId64 ") - starting earliest possible", startUnix, unixStreamBegin); targetParams["start"] = "-1"; }else{ targetParams["start"] = JSON::Value((startUnix - unixStreamBegin) * 1000).asString(); @@ -1145,20 +1130,20 @@ namespace Mist{ if (targetParams.count("start") && atoll(targetParams["start"].c_str()) != 0){ size_t mainTrack = getMainSelectedTrack(); int64_t startRec = atoll(targetParams["start"].c_str()); - if (startRec > M.getLastms(mainTrack)){ + if (startRec > M.getNowms(mainTrack)){ if (!M.getLive()){ onFail("Playback start past end of non-live source", true); return; } - int64_t streamAvail = M.getLastms(mainTrack); + int64_t streamAvail = M.getNowms(mainTrack); int64_t lastUpdated = Util::getMS(); INFO_MSG("Waiting for stream to reach playback starting point. Current last ms is '%" PRIu64 "'", streamAvail); while (Util::getMS() - lastUpdated < 5000 && startRec > streamAvail && keepGoing()){ Util::sleep(500); - if (M.getLastms(mainTrack) > streamAvail){ + if (M.getNowms(mainTrack) > streamAvail){ HIGH_MSG("Waiting for stream to reach playback starting point. Current last ms is '%" PRIu64 "'", streamAvail); stats(); - streamAvail = M.getLastms(mainTrack); + streamAvail = M.getNowms(mainTrack); lastUpdated = Util::getMS(); } } @@ -1309,12 +1294,12 @@ namespace Mist{ bool Output::reachedPlannedStop(){ // If we're recording to file and reached the target position, stop if (isRecordingToFile && targetParams.count("recstop") && - atoll(targetParams["recstop"].c_str()) <= lastPacketTime){ + atoll(targetParams["recstop"].c_str()) <= thisTime){ INFO_MSG("End of planned recording reached"); return true; } // Regardless of playback method, if we've reached the wanted stop point, stop - if (targetParams.count("stop") && atoll(targetParams["stop"].c_str()) <= lastPacketTime){ + if (targetParams.count("stop") && atoll(targetParams["stop"].c_str()) <= thisTime){ INFO_MSG("End of planned playback reached"); return true; } @@ -1331,7 +1316,7 @@ namespace Mist{ return false; } // is this a split point? - if (targetParams.count("nxt-split") && atoll(targetParams["nxt-split"].c_str()) <= lastPacketTime){ + if (targetParams.count("nxt-split") && atoll(targetParams["nxt-split"].c_str()) <= thisTime){ INFO_MSG("Split point reached"); return true; } @@ -1610,10 +1595,11 @@ namespace Mist{ // slow down processing, if real time speed is wanted if (realTime && buffer.getSyncMode()){ - uint8_t i = 6; - while (--i && thisPacket.getTime() > (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead) && + uint64_t amount = thisTime - targetTime(); + size_t i = (amount / 1000) + 6; + while (--i && thisTime > targetTime() && keepGoing()){ - uint64_t amount = thisPacket.getTime() - (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead); + amount = thisTime - targetTime(); if (amount > 1000){amount = 1000;} idleTime(amount); //Make sure we stay responsive to requests and stats while waiting @@ -1633,23 +1619,23 @@ namespace Mist{ // wait at most double the look ahead time, plus ten seconds uint64_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000 / sleepTime); uint64_t needsTime = thisTime + needsLookAhead; - bool firstTime = true; + bool firstLookahead = true; while (--timeoutTries && keepGoing()){ bool lookReady = true; for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ - if (meta.getLastms(it->first) <= needsTime){ + if (meta.getNowms(it->first) <= needsTime){ if (timeoutTries == 1){ WARN_MSG("Track %zu: %" PRIu64 " <= %" PRIu64, it->first, - meta.getLastms(it->first), needsTime); + meta.getNowms(it->first), needsTime); } lookReady = false; break; } } if (lookReady){break;} - if (firstTime){ - firstTime = false; + if (firstLookahead){ + firstLookahead = false; }else{ playbackSleep(sleepTime); } @@ -1760,7 +1746,7 @@ namespace Mist{ onFinish(); break; } - uint64_t endRec = lastPacketTime + atoll(targetParams["split"].c_str()) * 1000; + uint64_t endRec = thisTime + atoll(targetParams["split"].c_str()) * 1000; targetParams["nxt-split"] = JSON::Value(endRec).asString(); sentHeader = false; sendHeader(); @@ -1987,24 +1973,34 @@ namespace Mist{ return false; } - // if we're going to read past the end of the data page, load the next page - // this only happens for VoD + // if we're going to read past the end of the data page... if (nxt.offset >= curPage[nxt.tid].len || (!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){ - if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){ + // For non-live, we may have just reached the end of the track. That's normal and fine, drop it. + if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){ dropTrack(nxt.tid, "end of VoD track reached", false); return false; } + // Check if there is a next page for the timestamp we're looking for. if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){ loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time)); nxt.offset = 0; //Only read the next time if the page load succeeded and there is a packet to read from if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){ nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0); + nxt.ghostPacket = false; + }else{ + nxt.ghostPacket = true; } buffer.replaceFirst(nxt); return false; } + // We're still on the same page; ghost packets should update their time and retry later + if (nxt.ghostPacket){ + nxt.time = M.getNowms(nxt.tid); + buffer.replaceFirst(nxt); + return false; + } if (nxt.offset >= curPage[nxt.tid].len){ INFO_MSG("Reading past end of page %s: %" PRIu64 " > %" PRIu64 " for time %" PRIu64 " on track %zu", curPage[nxt.tid].name.c_str(), nxt.offset, curPage[nxt.tid].len, nxt.time, nxt.tid); dropTrack(nxt.tid, "reading past end of page"); @@ -2016,6 +2012,7 @@ namespace Mist{ } // We know this packet will be valid, pre-load it so we know its length DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true); + nxt.time = preLoad.getTime(); nextTime = 0; @@ -2042,6 +2039,7 @@ namespace Mist{ if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){ thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true); thisIdx = nxt.tid; + thisTime = nxt.time; dropTrack(nxt.tid, "end of non-live track reached", false); return true; } @@ -2068,6 +2066,9 @@ namespace Mist{ break;//Valid packet! } + // Force valid packet if nowMs is higher than current packet time + if (M.getNowms(nxt.tid) > nxt.time){break;} + //Okay, there's no next page yet, and no next packet on this page either. //That means we're waiting for data to show up, somewhere. @@ -2118,7 +2119,7 @@ namespace Mist{ } emptyCount = 0; // valid packet - reset empty counter thisIdx = nxt.tid; - thisTime = thisPacket.getTime(); + thisTime = nxt.time; if (!userSelect[nxt.tid]){ dropTrack(nxt.tid, "track is not alive!"); @@ -2134,7 +2135,14 @@ namespace Mist{ // we assume the next packet is the next on this same page nxt.offset += thisPacket.getDataLen(); - nxt.time = nextTime; + if (!nextTime){ + // If time is not known yet, insert a ghostPacket with a known safe time + nxt.time = M.getNowms(nxt.tid); + nxt.ghostPacket = true; + }else{ + nxt.time = nextTime; + nxt.ghostPacket = false; + } ++nxt.partIndex; // exchange the current packet in the buffer for the next one @@ -2425,11 +2433,11 @@ namespace Mist{ uint64_t oneTime = 0; uint64_t twoTime = 0; for (std::set::iterator it = vTracks.begin(); it != vTracks.end(); ++it){ - if (M.getLastms(*it) > oneTime){oneTime = M.getLastms(*it);} + if (M.getNowms(*it) > oneTime){oneTime = M.getNowms(*it);} } Util::wait(2000); for (std::set::iterator it = vTracks.begin(); it != vTracks.end(); ++it){ - if (M.getLastms(*it) > twoTime){twoTime = M.getLastms(*it);} + if (M.getNowms(*it) > twoTime){twoTime = M.getNowms(*it);} } if (twoTime <= oneTime+500){ disconnect(); diff --git a/src/output/output.h b/src/output/output.h index 15f617ee..795b545f 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -42,6 +42,7 @@ namespace Mist{ uint64_t currentTime(); uint64_t startTime(); uint64_t endTime(); + uint64_t targetTime(); void setBlocking(bool blocking); bool selectDefaultTracks(); bool connectToFile(std::string file, bool append = false, Socket::Connection *conn = 0); diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index 18ba66c4..755d9e4f 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -10,7 +10,15 @@ namespace Mist{ HTTPOutput::HTTPOutput(Socket::Connection &conn) : Output(conn){ + //Websocket related webSock = 0; + wsCmds = false; + stayLive = true; + target_rate = 0.0; + forwardTo = 0; + prevVidTrack = INVALID_TRACK_ID; + + //General idleInterval = 0; idleLast = 0; if (config->getString("ip").size()){myConn.setHost(config->getString("ip"));} @@ -179,16 +187,84 @@ namespace Mist{ return ""; } + bool HTTPOutput::onFinish(){ + //If we're in the middle of sending a chunked reply, finish it cleanly and get read for the next request + if (!webSock && H.sendingChunks){ + H.Chunkify(0, 0, myConn); + wantRequest = true; + return true; + } + //If we're a websocket and handling commands, finish it cleanly too + if (webSock && wsCmds){ + JSON::Value r; + r["type"] = "on_stop"; + r["data"]["current"] = currentTime(); + r["data"]["begin"] = Output::startTime(); + r["data"]["end"] = Output::endTime(); + webSock->sendFrame(r.toString()); + parseData = false; + return false; + } + //All other cases call the parent finish handler + return Output::onFinish(); + } + + void HTTPOutput::sendNext(){ + //If we're not in websocket mode and handling commands, we do nothing here + if (!wsCmds || !webSock){return;} + + //Finish fast-forwarding if forwardTo time was reached + if (forwardTo && thisTime >= forwardTo){ + forwardTo = 0; + if (target_rate == 0.0){ + realTime = 1000;//set playback speed to default + firstTime = Util::bootMS() - thisTime; + maxSkipAhead = 0;//enable automatic rate control + }else{ + stayLive = false; + //Set new realTime speed + realTime = 1000 / target_rate; + firstTime = Util::bootMS() - (thisTime / target_rate); + maxSkipAhead = 1;//disable automatic rate control + } + JSON::Value r; + r["type"] = "set_speed"; + r["data"]["play_rate_prev"] = "fast-forward"; + if (target_rate == 0.0){ + r["data"]["play_rate_curr"] = "auto"; + }else{ + r["data"]["play_rate_curr"] = target_rate; + } + webSock->sendFrame(r.toString()); + } + + // Handle nice move-over to new main video track + if (prevVidTrack != INVALID_TRACK_ID && thisIdx != prevVidTrack && M.getType(thisIdx) == "video"){ + if (!thisPacket.getFlag("keyframe")){ + // Ignore the packet if not a keyframe + return; + } + dropTrack(prevVidTrack, "Smoothly switching to new video track", false); + prevVidTrack = INVALID_TRACK_ID; + handleWebsocketIdle(); + onIdle(); + sendHeader(); + } + } + void HTTPOutput::requestHandler(){ // Handle onIdle function caller, if needed if (idleInterval && (Util::bootMS() > idleLast + idleInterval)){ + if (wsCmds){handleWebsocketIdle();} onIdle(); idleLast = Util::bootMS(); } // Handle websockets if (webSock){ if (webSock->readFrame()){ - onWebsocketFrame(); + if (!wsCmds || !handleWebsocketCommands()){ + onWebsocketFrame(); + } idleLast = Util::bootMS(); return; } @@ -270,6 +346,7 @@ namespace Mist{ if (H.GetVar("audio") != ""){targetParams["audio"] = H.GetVar("audio");} if (H.GetVar("video") != ""){targetParams["video"] = H.GetVar("video");} + if (H.GetVar("meta") != ""){targetParams["meta"] = H.GetVar("meta");} if (H.GetVar("subtitle") != ""){targetParams["subtitle"] = H.GetVar("subtitle");} if (H.GetVar("start") != ""){targetParams["start"] = H.GetVar("start");} if (H.GetVar("stop") != ""){targetParams["stop"] = H.GetVar("stop");} @@ -314,6 +391,13 @@ namespace Mist{ webSock = 0; return; } + //Generic websocket handling sets idle interval to 1s and changes name by appending "/WS" + if (wsCmds){ + idleInterval = 1000; + if (capa["name"].asStringRef().find("/WS") != std::string::npos){ + capa["name"] = capa["name"].asStringRef() + "/WS"; + } + } onWebsocketConnect(); H.Clean(); return; @@ -331,6 +415,361 @@ namespace Mist{ if (!sawRequest && !myConn.spool() && !isBlocking && !parseData){Util::sleep(100);} } + /// Handles standardized WebSocket commands. + /// Returns true if a command was executed, false otherwise. + bool HTTPOutput::handleWebsocketCommands(){ + //only handle text frames + if (webSock->frameType != 1){return false;} + + //Parse JSON and check command type + JSON::Value command = JSON::fromString(webSock->data, webSock->data.size()); + if (!command || !command.isMember("type")){return false;} + + //Seek command, for changing playback position + if (command["type"] == "seek") { + handleWebsocketSeek(command); + return true; + } + + //Pause command, toggles pause state + if (command["type"] == "pause") { + parseData = !parseData; + JSON::Value r; + r["type"] = "pause"; + r["paused"] = !parseData; + if (!parseData){ + //Store current target time into lastPacketTime when pausing + lastPacketTime = targetTime(); + }else{ + //On resume, restore the timing to be where it was when pausing + firstTime = Util::bootMS() - (lastPacketTime / target_rate); + } + webSock->sendFrame(r.toString()); + return true; + } + + //Hold command, forces pause state on + if (command["type"] == "hold") { + if (parseData){ + //Store current target time into lastPacketTime when pausing + lastPacketTime = targetTime(); + } + parseData = false; + webSock->sendFrame("{\"type\":\"pause\",\"paused\":true}"); + return true; + } + + //Tracks command, for (re)selecting tracks + if (command["type"] == "tracks") { + if (command.isMember("audio")){ + if (!command["audio"].isNull() && command["audio"] != "auto"){ + targetParams["audio"] = command["audio"].asString(); + }else{ + targetParams.erase("audio"); + } + } + if (command.isMember("video")){ + if (!command["video"].isNull() && command["video"] != "auto"){ + targetParams["video"] = command["video"].asString(); + }else{ + targetParams.erase("video"); + } + } + if (command.isMember("meta")){ + if (!command["meta"].isNull() && command["meta"] != "auto"){ + targetParams["meta"] = command["meta"].asString(); + }else{ + targetParams.erase("meta"); + } + } + if (command.isMember("seek_time")){ + possiblyReselectTracks(command["seek_time"].asInt()); + }else{ + possiblyReselectTracks(currentTime()); + } + return true; + } + + //Fast_forward command, fast-forwards to given timestamp and resume previous speed + if (command["type"] == "fast_forward"){ + if (command.isMember("ff_to")){ + forwardTo = command["ff_to"].asInt(); + if (forwardTo > currentTime()){ + realTime = 0; + }else{ + if (target_rate == 0.0){ + firstTime = Util::bootMS() - forwardTo; + }else{ + firstTime = Util::bootMS() - (forwardTo / target_rate); + } + forwardTo = 0; + } + }else{ + JSON::Value r; + r["type"] = "warning"; + r["warning"] = "Ignored fast_forward command: ff_to property missing"; + webSock->sendFrame(r.toString()); + } + onIdle(); + return true; + } + + //Set_speed command, changes playback speed + if (command["type"] == "set_speed") { + JSON::Value r; + r["type"] = "set_speed"; + if (!command.isMember("play_rate")){ + r["error"] = "play_rate missing"; + webSock->sendFrame(r.toString()); + return false; + } + + double set_rate = command["play_rate"].asDouble(); + if (!parseData){ + parseData = true; + selectDefaultTracks(); + } + + if (target_rate == 0.0){ + r["data"]["play_rate_prev"] = "auto"; + }else{ + r["data"]["play_rate_prev"] = target_rate; + } + if (set_rate == 0.0){ + r["data"]["play_rate_curr"] = "auto"; + }else{ + r["data"]["play_rate_curr"] = set_rate; + } + + if (target_rate != set_rate){ + uint64_t prevTargetTime = targetTime(); + target_rate = set_rate; + if (target_rate == 0.0){ + realTime = 1000;//set playback speed to default + firstTime = Util::bootMS() - prevTargetTime; + maxSkipAhead = 0;//enabled automatic rate control + }else{ + stayLive = false; + //Set new realTime speed + realTime = 1000 / target_rate; + firstTime = Util::bootMS() - (prevTargetTime / target_rate); + maxSkipAhead = 1;//disable automatic rate control + } + } + if (M.getLive()){r["data"]["live_point"] = stayLive;} + webSock->sendFrame(r.toString()); + handleWebsocketIdle(); + onIdle(); + return true; + } + + //Stop command, ends playback and disconnects the socket explicitly + if (command["type"] == "stop") { + Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "User requested stop"); + myConn.close(); + return true; + } + + //Play command, sets pause state off and optionally also seeks + if (command["type"] == "play") { + parseData = true; + if (command.isMember("seek_time")){ + handleWebsocketSeek(command); + }else{ + if (!currentTime()){ + command["seek_time"] = 0; + handleWebsocketSeek(command); + }else{ + parseData = true; + selectDefaultTracks(); + firstTime = Util::bootMS() - (lastPacketTime / target_rate); + } + } + return true; + } + + //Unhandled commands end up here + return false; + } + + void HTTPOutput::handleWebsocketIdle(){ + if (!webSock){return;} + if (!parseData){return;} + + //Finish fast-forwarding if forwardTo time was reached + if (forwardTo && targetTime() >= forwardTo){ + forwardTo = 0; + if (target_rate == 0.0){ + realTime = 1000;//set playback speed to default + firstTime = Util::bootMS() - targetTime(); + maxSkipAhead = 0;//enable automatic rate control + }else{ + stayLive = false; + //Set new realTime speed + realTime = 1000 / target_rate; + firstTime = Util::bootMS() - (targetTime() / target_rate); + maxSkipAhead = 1;//disable automatic rate control + } + JSON::Value r; + r["type"] = "set_speed"; + r["data"]["play_rate_prev"] = "fast-forward"; + if (target_rate == 0.0){ + r["data"]["play_rate_curr"] = "auto"; + }else{ + r["data"]["play_rate_curr"] = target_rate; + } + webSock->sendFrame(r.toString()); + } + + JSON::Value r; + r["type"] = "on_time"; + r["data"]["current"] = targetTime(); + r["data"]["next"] = currentTime(); + r["data"]["begin"] = Output::startTime(); + + r["data"]["end"] = Output::endTime(); + if (realTime == 0){ + r["data"]["play_rate_curr"] = "fast-forward"; + }else{ + if (target_rate == 0.0){ + r["data"]["play_rate_curr"] = "auto"; + }else{ + r["data"]["play_rate_curr"] = target_rate; + } + } + uint64_t jitter = 0; + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + r["data"]["tracks"].append((uint64_t)it->first); + if (jitter < M.getMinKeepAway(it->first)){jitter = M.getMinKeepAway(it->first);} + } + r["data"]["jitter"] = jitter; + if (M.getLive() && dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;} + if (capa.isMember("maxdelay") && capa["maxdelay"].asInt() < jitter*1.5){capa["maxdelay"] = jitter*1.5;} + webSock->sendFrame(r.toString()); + } + + bool HTTPOutput::possiblyReselectTracks(uint64_t seekTarget){ + // Remember the previous video track, if any. + std::set prevSelTracks; + prevVidTrack = INVALID_TRACK_ID; + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + prevSelTracks.insert(it->first); + if (M.getType(it->first) == "video"){ + prevVidTrack = it->first; + } + } + if (!selectDefaultTracks()) { + prevVidTrack = INVALID_TRACK_ID; + handleWebsocketIdle(); + onIdle(); + return false; + } + if (seekTarget != currentTime()){prevVidTrack = INVALID_TRACK_ID;} + bool hasVideo = false; + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + if (M.getType(it->first) == "video"){hasVideo = true;} + } + // Add the previous video track back, if we had one. + if (prevVidTrack != INVALID_TRACK_ID && !userSelect.count(prevVidTrack) && hasVideo){ + userSelect[prevVidTrack].reload(streamName, prevVidTrack); + seek(seekTarget); + std::set newSelTracks; + newSelTracks.insert(prevVidTrack); + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + if (M.getType(it->first) != "video"){ + newSelTracks.insert(it->first); + } + } + if (prevSelTracks != newSelTracks){ + seek(seekTarget, true); + realTime = 0; + forwardTo = seekTarget; + sendHeader(); + JSON::Value r; + r["type"] = "set_speed"; + if (target_rate == 0.0){ + r["data"]["play_rate_prev"] = "auto"; + }else{ + r["data"]["play_rate_prev"] = target_rate; + } + r["data"]["play_rate_curr"] = "fast-forward"; + webSock->sendFrame(r.toString()); + } + }else{ + prevVidTrack = INVALID_TRACK_ID; + seek(seekTarget, true); + realTime = 0; + forwardTo = seekTarget; + sendHeader(); + JSON::Value r; + r["type"] = "set_speed"; + if (target_rate == 0.0){ + r["data"]["play_rate_prev"] = "auto"; + }else{ + r["data"]["play_rate_prev"] = target_rate; + } + r["data"]["play_rate_curr"] = "fast-forward"; + webSock->sendFrame(r.toString()); + } + handleWebsocketIdle(); + onIdle(); + return true; + } + + + bool HTTPOutput::handleWebsocketSeek(const JSON::Value& command) { + JSON::Value r; + r["type"] = "seek"; + if (!command.isMember("seek_time")){ + r["error"] = "seek_time missing"; + webSock->sendFrame(r.toString()); + return false; + } + + uint64_t seek_time = command["seek_time"].asInt(); + if (!parseData){ + parseData = true; + selectDefaultTracks(); + } + + stayLive = (target_rate == 0.0) && (Output::endTime() < seek_time + 5000); + if (command["seek_time"].asStringRef() == "live"){stayLive = true;} + if (stayLive){seek_time = Output::endTime();} + + if (!seek(seek_time, true)) { + r["error"] = "seek failed, continuing as-is"; + webSock->sendFrame(r.toString()); + return false; + } + if (M.getLive()){r["data"]["live_point"] = stayLive;} + if (target_rate == 0.0){ + r["data"]["play_rate_curr"] = "auto"; + }else{ + r["data"]["play_rate_curr"] = target_rate; + } + if (command.isMember("ff_to") || (seek_time >= 250 && currentTime() < seek_time - 250)){ + forwardTo = seek_time; + if (command.isMember("ff_to") && command["ff_to"].asInt() > forwardTo){ + forwardTo = command["ff_to"].asInt(); + } + if (forwardTo < currentTime()){ + if (target_rate == 0.0){ + firstTime = Util::bootMS() - forwardTo; + }else{ + firstTime = Util::bootMS() - (forwardTo / target_rate); + } + forwardTo = 0; + }else{ + realTime = 0; + r["data"]["play_rate_curr"] = "fast-forward"; + } + } + handleWebsocketIdle(); + onIdle(); + webSock->sendFrame(r.toString()); + return true; + } + /// Default HTTP handler. /// Only takes care of OPTIONS and HEAD, saving the original request, and calling respondHTTP void HTTPOutput::onHTTP(){ diff --git a/src/output/output_http.h b/src/output/output_http.h index 3acb1b01..76e09f80 100644 --- a/src/output/output_http.h +++ b/src/output/output_http.h @@ -15,18 +15,32 @@ namespace Mist{ virtual void onHTTP(); virtual void respondHTTP(const HTTP::Parser & req, bool headersOnly); virtual void onIdle(){}; - virtual void onWebsocketFrame(){}; - virtual void onWebsocketConnect(){}; - virtual void preWebsocketConnect(){}; virtual void requestHandler(); virtual void preHTTP(); + virtual bool onFinish(); + virtual void sendNext(); static bool listenMode(){return false;} - virtual bool doesWebsockets(){return false;} void reConnector(std::string &connector); std::string getHandler(); bool parseRange(std::string header, uint64_t &byteStart, uint64_t &byteEnd); + //WebSocket related + virtual bool doesWebsockets(){return false;} + virtual void onWebsocketFrame(){}; + virtual void onWebsocketConnect(){}; + virtual void preWebsocketConnect(){}; + bool handleWebsocketCommands(); + void handleWebsocketIdle(); + bool handleWebsocketSeek(const JSON::Value & command); + bool possiblyReselectTracks(uint64_t seekTarget); protected: + //WebSocket related + bool wsCmds; ///< If true, implements all our standard websocket-based seek/play/etc commands + double target_rate; ///< Target playback speed rate (1.0 = normal, 0 = auto) + uint64_t forwardTo; ///< Playback position we're fast-forwarding towards + size_t prevVidTrack; ///< Previously selected main video track + bool stayLive; ///< Whether or not we're trying to stay on the live-most point, for live streams + bool responded; HTTP::Parser H; HTTP::Websocket *webSock; diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 980662f0..697ee6ff 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -9,6 +9,8 @@ #include #include +bool includeZeroMatches = false; + namespace Mist{ /// Helper function to find the protocol entry for a given port number std::string getProtocolForPort(uint16_t portNo){ @@ -647,7 +649,7 @@ namespace Mist{ // loop over the added sources, add them to json_resp["sources"] for (std::set::iterator it = sources.begin(); it != sources.end(); it++){ - if ((*it)["simul_tracks"].asInt() > 0){ + if (includeZeroMatches || (*it)["simul_tracks"].asInt() > 0){ if (Comms::tknMode & 0x04){ JSON::Value tmp; tmp = (*it); @@ -664,6 +666,7 @@ namespace Mist{ void OutHTTP::respondHTTP(const HTTP::Parser & req, bool headersOnly){ origStreamName = streamName; + includeZeroMatches = req.GetVar("inclzero").size(); if (req.GetHeader("X-Mst-Path").size()){mistPath = req.GetHeader("X-Mst-Path");} diff --git a/src/output/output_httpts.cpp b/src/output/output_httpts.cpp index c9d28b26..7253f292 100644 --- a/src/output/output_httpts.cpp +++ b/src/output/output_httpts.cpp @@ -74,6 +74,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("+AC3"); capa["codecs"][0u][1u].append("+MP2"); capa["codecs"][0u][1u].append("+opus"); + capa["codecs"][0u][2u].append("+JSON"); capa["codecs"][1u][0u].append("rawts"); capa["methods"][0u]["handler"] = "http"; capa["methods"][0u]["type"] = "html5/video/mpeg"; diff --git a/src/output/output_json.cpp b/src/output/output_json.cpp index de6643e5..10208aca 100644 --- a/src/output/output_json.cpp +++ b/src/output/output_json.cpp @@ -5,6 +5,7 @@ namespace Mist{ OutJSON::OutJSON(Socket::Connection &conn) : HTTPOutput(conn){ + wsCmds = true; realTime = 0; bootMsOffset = 0; keepReselecting = false; @@ -19,7 +20,8 @@ namespace Mist{ capa["friendly"] = "JSON over HTTP"; capa["desc"] = "Pseudostreaming in JSON format over HTTP"; capa["url_match"] = "/$.json"; - capa["codecs"][0u][0u].append("@+meta"); + capa["codecs"][0u][0u].append("@meta"); + capa["codecs"][0u][0u].append("subtitle"); capa["methods"][0u]["handler"] = "http"; capa["methods"][0u]["type"] = "html5/text/javascript"; capa["methods"][0u]["hrn"] = "JSON progressive"; @@ -33,6 +35,9 @@ namespace Mist{ } void OutJSON::sendNext(){ + //Call parent handler for generic websocket handling + HTTPOutput::sendNext(); + if (keepReselecting){ // If we can select more tracks, do it and continue. if (selectDefaultTracks()){ @@ -44,11 +49,31 @@ namespace Mist{ char *dPtr; size_t dLen; thisPacket.getString("data", dPtr, dLen); + if (dLen == 0 || (dLen == 1 && dPtr[0] == ' ')){return;} jPack["data"] = JSON::fromString(dPtr, dLen); - jPack["time"] = thisPacket.getTime(); + jPack["time"] = thisTime; jPack["track"] = (uint64_t)thisIdx; + }else if (M.getCodec(thisIdx) == "subtitle"){ + char *dPtr; + size_t dLen; + thisPacket.getString("data", dPtr, dLen); + + //Ignore blank subtitles + if (dLen == 0 || (dLen == 1 && dPtr[0] == ' ')){return;} + + //Get duration, or calculate if missing + uint64_t duration = thisPacket.getInt("duration"); + if (!duration){duration = dLen * 75 + 800;} + + //Build JSON data to transmit + jPack["duration"] = duration; + jPack["time"] = thisTime; + jPack["track"] = (uint64_t)thisIdx; + jPack["data"] = std::string(dPtr, dLen); }else{ jPack = thisPacket.toJSON(); + jPack.removeMember("bpos"); + jPack["generic_converter_used"] = true; } if (dupcheck){ if (jPack.compareExcept(lastVal, nodup)){ @@ -75,13 +100,14 @@ namespace Mist{ } void OutJSON::sendHeader(){ + sentHeader = true; + if (webSock){return;} std::string method = H.method; H.Clean(); H.SetHeader("Content-Type", "text/javascript"); H.protocol = "HTTP/1.0"; H.setCORSHeaders(); H.SendResponse("200", "OK", myConn); - sentHeader = true; } void OutJSON::onFail(const std::string &msg, bool critical){ @@ -203,9 +229,11 @@ namespace Mist{ /// Repeats last JSON packet every 5 seconds to keep stream alive. void OutJSON::onIdle(){ - lastOutTime += (Util::bootMS() - lastSendTime); - lastSendTime = Util::bootMS(); - bufferLivePacket(lastOutTime, 0, pushTrack, lastOutData.data(), lastOutData.size(), 0, true); + if (isPushing()){ + lastOutTime += (Util::bootMS() - lastSendTime); + lastSendTime = Util::bootMS(); + bufferLivePacket(lastOutTime, 0, pushTrack, lastOutData.data(), lastOutData.size(), 0, true); + } } void OutJSON::onHTTP(){ diff --git a/src/output/output_jsonline.cpp b/src/output/output_jsonline.cpp new file mode 100644 index 00000000..653d6bad --- /dev/null +++ b/src/output/output_jsonline.cpp @@ -0,0 +1,112 @@ +#include "output_jsonline.h" +#include +#include +#include +#include +#include + +namespace Mist{ + OutJSONLine::OutJSONLine(Socket::Connection &conn) : Output(conn){ + trkIdx = INVALID_TRACK_ID; + streamName = config->getString("streamname"); + wantRequest = true; + parseData = false; + if (Triggers::shouldTrigger("PUSH_REWRITE")){ + std::string payload = "jsonline://" + myConn.getBoundAddress() + ":" + config->getOption("port").asString() + "\n" + getConnectedHost() + "\n" + streamName; + std::string newStream = streamName; + Triggers::doTrigger("PUSH_REWRITE", payload, "", false, newStream); + if (!newStream.size()){ + FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL", + getConnectedHost().c_str(), reqUrl.c_str()); + config->is_active = false; + return; + }else{ + streamName = newStream; + Util::sanitizeName(streamName); + Util::setStreamName(streamName); + } + } + if (!allowPush("")){ + FAIL_MSG("Pushing not allowed"); + config->is_active = false; + return; + } + initialize(); + trkIdx = meta.addTrack(); + meta.setType(trkIdx, "meta"); + meta.setCodec(trkIdx, config->getString("codec")); + meta.setID(trkIdx, 1); + offset = M.getBootMsOffset(); + myConn.setBlocking(false); + INFO_MSG("%s track index is %zu", config->getString("codec").c_str(), trkIdx); + } + + OutJSONLine::~OutJSONLine(){ + if (trkIdx != INVALID_TRACK_ID && M){ + meta.abandonTrack(trkIdx); + } + } + + void OutJSONLine::init(Util::Config *cfg){ + Output::init(cfg); + capa["name"] = "JSONLine"; + capa["friendly"] = "JSON lines over raw TCP"; + capa["desc"] = "Real time JSON line-by-line input over a raw TCP socket or standard input"; + capa["deps"] = ""; + capa["required"]["streamname"]["name"] = "Stream"; + capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add " + "this protocol multiple times using different ports."; + capa["required"]["streamname"]["type"] = "str"; + capa["required"]["streamname"]["option"] = "--stream"; + capa["required"]["streamname"]["short"] = "s"; + + cfg->addOption("codec", + JSON::fromString("{\"arg\":\"string\",\"default\":\"JSON\",\"short\":\"c\",\"long\":" + "\"codec\",\"help\":\"Codec to use for data ingest, JSON by default\"}")); + capa["optional"]["codec"]["name"] = "Codec"; + capa["optional"]["codec"]["help"] = "What codec to ingest as"; + capa["optional"]["codec"]["default"] = "JSON"; + capa["optional"]["codec"]["type"] = "str"; + capa["optional"]["codec"]["option"] = "--codec"; + capa["optional"]["codec"]["short"] = "c"; + + capa["codecs"][0u][0u].append("JSON"); + cfg->addConnectorOptions(3456, capa); + config = cfg; + } + + void OutJSONLine::sendNext(){ + } + + bool OutJSONLine::listenMode(){return true;} + + void OutJSONLine::requestHandler(){ + if (myConn.spool()){ + while (myConn.Received().size()){ + dPtr.append(myConn.Received().get()); + myConn.Received().get().clear(); + if (dPtr.size() && dPtr[dPtr.size() - 1] == '\n'){ + thisTime = Util::bootMS() - offset; + thisIdx = trkIdx; + thisPacket.genericFill(thisTime, 0, 1, dPtr, dPtr.size(), 0, true); + bufferLivePacket(thisPacket); + dPtr.truncate(0); + } + } + }else{ + meta.setNowms(trkIdx, Util::bootMS() - offset); + Util::sleep(10); + } + } + + std::string OutJSONLine::getStatsName(){ + if (!parseData){ + return "INPUT:" + capa["name"].asStringRef(); + }else{ + return Output::getStatsName(); + } + } + + bool OutJSONLine::isReadyForPlay(){return true;} + +}// namespace Mist diff --git a/src/output/output_jsonline.h b/src/output/output_jsonline.h new file mode 100644 index 00000000..695d5cef --- /dev/null +++ b/src/output/output_jsonline.h @@ -0,0 +1,26 @@ +#include "output.h" + +namespace Mist{ + class OutJSONLine : public Output{ + public: + OutJSONLine(Socket::Connection &conn); + ~OutJSONLine(); + static void init(Util::Config *cfg); + void sendNext(); + static bool listenMode(); + bool isReadyForPlay(); + void requestHandler(); + private: + std::string getStatsName(); + Util::ResizeablePointer dPtr; + size_t trkIdx; + uint64_t offset; + + protected: + inline virtual bool keepGoing(){ + return config->is_active && (!listenMode() || myConn); + } + }; +}// namespace Mist + +typedef Mist::OutJSONLine mistOut; diff --git a/src/output/output_mp4.cpp b/src/output/output_mp4.cpp index 72c1e7a4..9cac582b 100644 --- a/src/output/output_mp4.cpp +++ b/src/output/output_mp4.cpp @@ -106,14 +106,11 @@ namespace Mist{ } OutMP4::OutMP4(Socket::Connection &conn) : HTTPOutput(conn){ - prevVidTrack = INVALID_TRACK_ID; + wsCmds = true; nextHeaderTime = 0xffffffffffffffffull; startTime = 0; endTime = 0xffffffffffffffffull; realBaseOffset = 1; - stayLive = true; - target_rate = 0.0; - forwardTo = 0; } OutMP4::~OutMP4(){} @@ -1197,12 +1194,8 @@ namespace Mist{ } void OutMP4::sendNext(){ - - if (!thisPacket.getData()) { - FAIL_MSG("`thisPacket.getData()` is invalid."); - return; - } - + //Call parent handler for generic websocket handling + HTTPOutput::sendNext(); // Obtain a pointer to the data of this packet char *dataPointer = 0; size_t len = 0; @@ -1210,68 +1203,6 @@ namespace Mist{ // WebSockets send each packet directly. The packet is constructed in `appendSinglePacketMoof()`. if (webSock) { - - if (forwardTo && currentTime() >= forwardTo){ - forwardTo = 0; - if (target_rate == 0.0){ - realTime = 1000;//set playback speed to default - firstTime = Util::bootMS() - currentTime(); - maxSkipAhead = 0;//enabled automatic rate control - }else{ - stayLive = false; - //Set new realTime speed - realTime = 1000 / target_rate; - firstTime = Util::bootMS() - (currentTime() / target_rate); - maxSkipAhead = 1;//disable automatic rate control - } - JSON::Value r; - r["type"] = "set_speed"; - r["data"]["play_rate_prev"] = "fast-forward"; - if (target_rate == 0.0){ - r["data"]["play_rate_curr"] = "auto"; - }else{ - r["data"]["play_rate_curr"] = target_rate; - } - webSock->sendFrame(r.toString()); - } - - // Handle nice move-over to new track ID - if (prevVidTrack != INVALID_TRACK_ID && thisIdx != prevVidTrack && M.getType(thisIdx) == "video"){ - if (!thisPacket.getFlag("keyframe")){ - // Ignore the packet if not a keyframe - return; - } - dropTrack(prevVidTrack, "Smoothly switching to new video track", false); - prevVidTrack = INVALID_TRACK_ID; - onIdle(); - sendHeader(); - -/* - MP4::AVCC avccbox; - avccbox.setPayload(M.getInit(thisIdx)); - std::string bs = avccbox.asAnnexB(); - static Util::ResizeablePointer initBuf; - initBuf.assign(0,0); - initBuf.allocate(bs.size()); - char * ib = initBuf; - initBuf.append(0, nalu::fromAnnexB(bs.data(), bs.size(), ib)); - - webBuf.truncate(0); - appendSinglePacketMoof(webBuf, bs.size()); - - char mdatHeader[8] ={0x00, 0x00, 0x00, 0x00, 'm', 'd', 'a', 't'}; - Bit::htobl(mdatHeader, 8 + len); //8 bytes for the header + length of data. - webBuf.append(mdatHeader, 8); - webBuf.append(dataPointer, len); - webBuf.append(initBuf, initBuf.size()); - webSock->sendFrame(webBuf, webBuf.size(), 2); - return; -*/ - - - } - - webBuf.truncate(0); appendSinglePacketMoof(webBuf); @@ -1415,9 +1346,6 @@ namespace Mist{ } webSock->sendFrame(headerData, headerData.size(), 2); - std::ofstream bleh("/tmp/bleh.mp4"); - bleh.write(headerData, headerData.size()); - bleh.close(); sentHeader = true; return; } @@ -1448,24 +1376,15 @@ namespace Mist{ } void OutMP4::onWebsocketConnect() { - capa["name"] = "MP4/WS"; capa["maxdelay"] = 5000; fragSeqNum = 0; - idleInterval = 1000; maxSkipAhead = 0; if (M.getLive()){dataWaitTimeout = 450;} } void OutMP4::onWebsocketFrame() { - JSON::Value command = JSON::fromString(webSock->data, webSock->data.size()); - if (!command.isMember("type")) { - JSON::Value r; - r["type"] = "error"; - r["data"] = "type field missing from command"; - webSock->sendFrame(r.toString()); - return; - } + if (!command.isMember("type")) {return;} if (command["type"] == "request_codec_data") { //If no supported codecs are passed, assume autodetected capabilities @@ -1490,119 +1409,10 @@ namespace Mist{ selectDefaultTracks(); initialSeek(); sendHeader(); - }else if (command["type"] == "seek") { - handleWebsocketSeek(command); - }else if (command["type"] == "pause") { - parseData = !parseData; - JSON::Value r; - r["type"] = "pause"; - r["paused"] = !parseData; - //Make sure we reset our timing code, too - if (parseData){ - firstTime = Util::bootMS() - (currentTime() / target_rate); - } - webSock->sendFrame(r.toString()); - }else if (command["type"] == "hold") { - parseData = false; - webSock->sendFrame("{\"type\":\"pause\",\"paused\":true}"); - }else if (command["type"] == "tracks") { - if (command.isMember("audio")){ - if (!command["audio"].isNull() && command["audio"] != "auto"){ - targetParams["audio"] = command["audio"].asString(); - }else{ - targetParams.erase("audio"); - } - } - if (command.isMember("video")){ - if (!command["video"].isNull() && command["video"] != "auto"){ - targetParams["video"] = command["video"].asString(); - }else{ - targetParams.erase("video"); - } - } - if (command.isMember("seek_time")){ - possiblyReselectTracks(command["seek_time"].asInt()); - }else{ - possiblyReselectTracks(currentTime()); - } return; - }else if (command["type"] == "set_speed") { - handleWebsocketSetSpeed(command); - }else if (command["type"] == "stop") { - Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "User requested stop"); - myConn.close(); - }else if (command["type"] == "play") { - parseData = true; - if (command.isMember("seek_time")){handleWebsocketSeek(command);} } } - bool OutMP4::possiblyReselectTracks(uint64_t seekTarget){ - // Remember the previous video track, if any. - std::set prevSelTracks; - prevVidTrack = INVALID_TRACK_ID; - for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ - prevSelTracks.insert(it->first); - if (M.getType(it->first) == "video"){ - prevVidTrack = it->first; - } - } - if (!selectDefaultTracks()) { - prevVidTrack = INVALID_TRACK_ID; - onIdle(); - return false; - } - if (seekTarget != currentTime()){prevVidTrack = INVALID_TRACK_ID;} - bool hasVideo = false; - for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ - if (M.getType(it->first) == "video"){hasVideo = true;} - } - // Add the previous video track back, if we had one. - if (prevVidTrack != INVALID_TRACK_ID && !userSelect.count(prevVidTrack) && hasVideo){ - userSelect[prevVidTrack].reload(streamName, prevVidTrack); - seek(seekTarget); - std::set newSelTracks; - newSelTracks.insert(prevVidTrack); - for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ - if (M.getType(it->first) != "video"){ - newSelTracks.insert(it->first); - } - } - if (prevSelTracks != newSelTracks){ - seek(seekTarget, true); - realTime = 0; - forwardTo = seekTarget; - sendHeader(); - JSON::Value r; - r["type"] = "set_speed"; - if (target_rate == 0.0){ - r["data"]["play_rate_prev"] = "auto"; - }else{ - r["data"]["play_rate_prev"] = target_rate; - } - r["data"]["play_rate_curr"] = "fast-forward"; - webSock->sendFrame(r.toString()); - } - }else{ - prevVidTrack = INVALID_TRACK_ID; - seek(seekTarget, true); - realTime = 0; - forwardTo = seekTarget; - sendHeader(); - JSON::Value r; - r["type"] = "set_speed"; - if (target_rate == 0.0){ - r["data"]["play_rate_prev"] = "auto"; - }else{ - r["data"]["play_rate_prev"] = target_rate; - } - r["data"]["play_rate_curr"] = "fast-forward"; - webSock->sendFrame(r.toString()); - } - onIdle(); - return true; - } - void OutMP4::sendWebsocketCodecData(const std::string& type) { JSON::Value r; r["type"] = type; @@ -1627,136 +1437,6 @@ namespace Mist{ webSock->sendFrame(r.toString()); } - bool OutMP4::handleWebsocketSeek(JSON::Value& command) { - JSON::Value r; - r["type"] = "seek"; - if (!command.isMember("seek_time")){ - r["error"] = "seek_time missing"; - webSock->sendFrame(r.toString()); - return false; - } - - uint64_t seek_time = command["seek_time"].asInt(); - if (!parseData){ - parseData = true; - selectDefaultTracks(); - } - - stayLive = (target_rate == 0.0) && (Output::endTime() < seek_time + 5000); - if (command["seek_time"].asStringRef() == "live"){stayLive = true;} - if (stayLive){seek_time = Output::endTime();} - - if (!seek(seek_time, true)) { - r["error"] = "seek failed, continuing as-is"; - webSock->sendFrame(r.toString()); - return false; - } - if (M.getLive()){r["data"]["live_point"] = stayLive;} - if (target_rate == 0.0){ - r["data"]["play_rate_curr"] = "auto"; - }else{ - r["data"]["play_rate_curr"] = target_rate; - } - if (seek_time >= 250 && currentTime() < seek_time - 250){ - forwardTo = seek_time; - realTime = 0; - r["data"]["play_rate_curr"] = "fast-forward"; - } - onIdle(); - webSock->sendFrame(r.toString()); - return true; - } - - bool OutMP4::handleWebsocketSetSpeed(JSON::Value& command) { - JSON::Value r; - r["type"] = "set_speed"; - if (!command.isMember("play_rate")){ - r["error"] = "play_rate missing"; - webSock->sendFrame(r.toString()); - return false; - } - - double set_rate = command["play_rate"].asDouble(); - if (!parseData){ - parseData = true; - selectDefaultTracks(); - } - - if (target_rate == 0.0){ - r["data"]["play_rate_prev"] = "auto"; - }else{ - r["data"]["play_rate_prev"] = target_rate; - } - if (set_rate == 0.0){ - r["data"]["play_rate_curr"] = "auto"; - }else{ - r["data"]["play_rate_curr"] = set_rate; - } - - if (target_rate != set_rate){ - target_rate = set_rate; - if (target_rate == 0.0){ - realTime = 1000;//set playback speed to default - firstTime = Util::bootMS() - currentTime(); - maxSkipAhead = 0;//enabled automatic rate control - }else{ - stayLive = false; - //Set new realTime speed - realTime = 1000 / target_rate; - firstTime = Util::bootMS() - (currentTime() / target_rate); - maxSkipAhead = 1;//disable automatic rate control - } - } - if (M.getLive()){r["data"]["live_point"] = stayLive;} - webSock->sendFrame(r.toString()); - onIdle(); - return true; - } - - void OutMP4::onIdle() { - if (!webSock){return;} - if (!parseData){return;} - JSON::Value r; - r["type"] = "on_time"; - r["data"]["current"] = currentTime(); - r["data"]["begin"] = Output::startTime(); - r["data"]["end"] = Output::endTime(); - if (realTime == 0){ - r["data"]["play_rate_curr"] = "fast-forward"; - }else{ - if (target_rate == 0.0){ - r["data"]["play_rate_curr"] = "auto"; - }else{ - r["data"]["play_rate_curr"] = target_rate; - } - } - uint64_t jitter = 0; - for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ - r["data"]["tracks"].append((uint64_t)it->first); - if (jitter < M.getMinKeepAway(it->first)){jitter = M.getMinKeepAway(it->first);} - } - r["data"]["jitter"] = jitter; - if (M.getLive() && dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;} - if (capa["maxdelay"].asInt() < jitter*1.5){capa["maxdelay"] = jitter*1.5;} - webSock->sendFrame(r.toString()); - } - - bool OutMP4::onFinish() { - if (!webSock){ - H.Chunkify(0, 0, myConn); - wantRequest = true; - return true; - } - JSON::Value r; - r["type"] = "on_stop"; - r["data"]["current"] = currentTime(); - r["data"]["begin"] = Output::startTime(); - r["data"]["end"] = Output::endTime(); - webSock->sendFrame(r.toString()); - parseData = false; - return false; - } - void OutMP4::dropTrack(size_t trackId, const std::string &reason, bool probablyBad){ if (webSock && (reason == "EOP: data wait timeout" || reason == "disappeared from metadata") && possiblyReselectTracks(currentTime())){ return; diff --git a/src/output/output_mp4.h b/src/output/output_mp4.h index 05e1c927..3176c8a6 100644 --- a/src/output/output_mp4.h +++ b/src/output/output_mp4.h @@ -106,16 +106,10 @@ namespace Mist{ bool doesWebsockets() { return true; } void onWebsocketConnect(); void onWebsocketFrame(); - void onIdle(); - virtual bool onFinish(); virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true); protected: - bool possiblyReselectTracks(uint64_t seekTarget); void sendWebsocketCodecData(const std::string& type); bool handleWebsocketSeek(JSON::Value& command); - bool handleWebsocketSetSpeed(JSON::Value& command); - bool stayLive; - double target_rate; ///< Target playback speed rate (1.0 = normal, 0 = auto) uint64_t fileSize; uint64_t byteStart; @@ -123,11 +117,9 @@ namespace Mist{ int64_t leftOver; uint64_t currPos; uint64_t seekPoint; - uint64_t forwardTo; uint64_t nextHeaderTime; uint64_t headerSize; - size_t prevVidTrack; // variables for standard MP4 std::set sortSet; // needed for unfragmented MP4, remembers the order of keyparts diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 39277cb0..dcc9b2c0 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -1707,7 +1707,7 @@ namespace Mist{ size_t reTrack = next.cs_id * 3 + (F.data[0] == 0x09 ? 1 : (F.data[0] == 0x08 ? 2 : 3)); if (!reTrackToID.count(reTrack)){reTrackToID[reTrack] = INVALID_TRACK_ID;} F.toMeta(meta, *amf_storage, reTrackToID[reTrack], targetParams); - if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ + if ((F.getDataLen() || (amf_storage && amf_storage->hasContent())) && !(F.needsInitData() && F.isInitData())){ uint64_t tagTime = next.timestamp; uint64_t timeOffset = 0; if (targetParams.count("timeoffset")){ @@ -1778,8 +1778,22 @@ namespace Mist{ } } ltt = tagTime; - // bufferLivePacket(thisPacket); - bufferLivePacket(tagTime, F.offset(), idx, F.getData(), F.getDataLen(), 0, F.isKeyframe); + if (ltt){ + for (std::map::iterator it = lastTagTime.begin(); it != lastTagTime.end(); ++it){ + if (it->second == reTrack){continue;} + size_t iIdx = reTrackToID[it->second]; + if (it->first < ltt){ + meta.setNowms(iIdx, ltt-1); + it->second = ltt-1; + } + } + } + if (F.data[0] == 0x12 && amf_storage){ + std::string mData = amf_storage->toJSON().toString(); + bufferLivePacket(tagTime, F.offset(), idx, mData.c_str(), mData.size(), 0, true); + }else{ + bufferLivePacket(tagTime, F.offset(), idx, F.getData(), F.getDataLen(), 0, F.isKeyframe); + } if (!meta){config->is_active = false;} } break; diff --git a/src/output/output_srt.cpp b/src/output/output_srt.cpp index 81994a07..b4e8c6c2 100644 --- a/src/output/output_srt.cpp +++ b/src/output/output_srt.cpp @@ -30,6 +30,12 @@ namespace Mist{ } void OutSRT::sendNext(){ + // Reached the end we wanted? Stop here. + if (filter_to > 0 && thisTime > filter_to && filter_to > filter_from){ + config->is_active = false; + return; + } + char *dataPointer = 0; size_t len = 0; thisPacket.getString("data", dataPointer, len); @@ -37,35 +43,22 @@ namespace Mist{ if (len == 0 || (len == 1 && dataPointer[0] == ' ')){return;} std::stringstream tmp; if (!webVTT){tmp << lastNum++ << std::endl;} - uint64_t time = thisPacket.getTime(); - - // filter subtitle in specific timespan - if (filter_from > 0 && time < filter_from){ - index++; // when using seek, the index is lost. - seek(filter_from); - return; - } - - if (filter_to > 0 && time > filter_to && filter_to > filter_from){ - config->is_active = false; - return; - } char tmpBuf[50]; size_t tmpLen = - sprintf(tmpBuf, "%.2" PRIu64 ":%.2" PRIu64 ":%.2" PRIu64 ".%.3" PRIu64, (time / 3600000), - ((time % 3600000) / 60000), (((time % 3600000) % 60000) / 1000), time % 1000); + sprintf(tmpBuf, "%.2" PRIu64 ":%.2" PRIu64 ":%.2" PRIu64 ".%.3" PRIu64, (thisTime / 3600000), + ((thisTime % 3600000) / 60000), (((thisTime % 3600000) % 60000) / 1000), thisTime % 1000); tmp.write(tmpBuf, tmpLen); tmp << " --> "; - time += thisPacket.getInt("duration"); - if (time == thisPacket.getTime()){time += len * 75 + 800;} + uint64_t time = thisTime + thisPacket.getInt("duration"); + if (time == thisTime){time += len * 75 + 800;} tmpLen = sprintf(tmpBuf, "%.2" PRIu64 ":%.2" PRIu64 ":%.2" PRIu64 ".%.3" PRIu64, (time / 3600000), ((time % 3600000) / 60000), (((time % 3600000) % 60000) / 1000), time % 1000); tmp.write(tmpBuf, tmpLen); tmp << std::endl; myConn.SendNow(tmp.str()); - // prevent double newlines - if (dataPointer[len - 1] == '\n'){--dataPointer;} + // prevent extra newlines + while (len && dataPointer[len - 1] == '\n'){--len;} myConn.SendNow(dataPointer, len); myConn.SendNow("\n\n"); } @@ -82,7 +75,7 @@ namespace Mist{ void OutSRT::onHTTP(){ std::string method = H.method; webVTT = (H.url.find(".vtt") != std::string::npos) || (H.url.find(".webvtt") != std::string::npos); - if (H.GetVar("track") != ""){ + if (H.GetVar("track").size()){ size_t tid = atoll(H.GetVar("track").c_str()); if (M.getValidTracks().count(tid)){ userSelect.clear(); @@ -94,7 +87,10 @@ namespace Mist{ filter_to = 0; index = 0; - if (H.GetVar("from") != ""){filter_from = JSON::Value(H.GetVar("from")).asInt();} + if (H.GetVar("from") != ""){ + filter_from = JSON::Value(H.GetVar("from")).asInt(); + seek(filter_from); + } if (H.GetVar("to") != ""){filter_to = JSON::Value(H.GetVar("to")).asInt();} if (filter_to){realTime = 0;} diff --git a/src/output/output_ts.cpp b/src/output/output_ts.cpp index bf74ab3a..a476bec9 100644 --- a/src/output/output_ts.cpp +++ b/src/output/output_ts.cpp @@ -180,6 +180,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("+AC3"); capa["codecs"][0u][1u].append("+MP2"); capa["codecs"][0u][1u].append("+opus"); + capa["codecs"][0u][2u].append("+JSON"); capa["codecs"][1u][0u].append("rawts"); cfg->addConnectorOptions(8888, capa); config = cfg; @@ -194,6 +195,25 @@ namespace Mist{ opt["arg_num"] = 1; opt["help"] = "Target tsudp:// or tsrtp:// or tstcp:// URL to push out towards."; cfg->addOption("target", opt); + + capa["optional"]["datatrack"]["name"] = "MPEG Data track parser"; + capa["optional"]["datatrack"]["help"] = "Which parser to use for data tracks"; + capa["optional"]["datatrack"]["type"] = "select"; + capa["optional"]["datatrack"]["option"] = "--datatrack"; + capa["optional"]["datatrack"]["short"] = "D"; + capa["optional"]["datatrack"]["default"] = ""; + capa["optional"]["datatrack"]["select"][0u][0u] = ""; + capa["optional"]["datatrack"]["select"][0u][1u] = "None / disabled"; + capa["optional"]["datatrack"]["select"][1u][0u] = "json"; + capa["optional"]["datatrack"]["select"][1u][1u] = "2b size-prepended JSON"; + + opt.null(); + opt["long"] = "datatrack"; + opt["short"] = "D"; + opt["arg"] = "string"; + opt["default"] = ""; + opt["help"] = "Which parser to use for data tracks"; + config->addOption("datatrack", opt); } void OutTS::initialSeek(){ @@ -290,6 +310,9 @@ namespace Mist{ onFinish(); return; } + if (config->getString("datatrack") == "json"){ + tsIn.setRawDataParser(TS::JSON); + } } // we now know we probably have a packet ready at the next 188 bytes // remove from buffer and insert into TS input diff --git a/src/output/output_ts_base.cpp b/src/output/output_ts_base.cpp index 695eab24..80d40238 100644 --- a/src/output/output_ts_base.cpp +++ b/src/output/output_ts_base.cpp @@ -226,8 +226,14 @@ namespace Mist{ fillPacket(dataPointer, dataLen, firstPack, video, keyframe, pkgPid, contPkg); }else if (type == "meta"){ long unsigned int tempLen = dataLen; + if (codec == "JSON"){tempLen += 2;} bs = TS::Packet::getPESMetaLeadIn(tempLen, packTime, M.getBps(thisIdx)); fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); + if (codec == "JSON"){ + char dLen[2]; + Bit::htobs(dLen, dataLen); + fillPacket(dLen, 2, firstPack, video, keyframe, pkgPid, contPkg); + } fillPacket(dataPointer, dataLen, firstPack, video, keyframe, pkgPid, contPkg); } if (packData.getBytesFree() < 184){ diff --git a/src/output/output_tsrist.cpp b/src/output/output_tsrist.cpp index fa00a379..f26d9fc0 100644 --- a/src/output/output_tsrist.cpp +++ b/src/output/output_tsrist.cpp @@ -181,6 +181,9 @@ namespace Mist{ onFinish(); return; } + if (config->getString("datatrack") == "json"){ + tsIn.setRawDataParser(TS::JSON); + } parseData = false; wantRequest = true; @@ -225,6 +228,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("+AC3"); capa["codecs"][0u][1u].append("+MP2"); capa["codecs"][0u][1u].append("+opus"); + capa["codecs"][0u][2u].append("+JSON"); capa["codecs"][1u][0u].append("rawts"); capa["optional"]["profile"]["name"] = "RIST profile"; @@ -283,6 +287,25 @@ namespace Mist{ opt["arg_num"] = 1; opt["help"] = "Target rist:// URL to push out towards."; cfg->addOption("target", opt); + + opt.null(); + opt["long"] = "datatrack"; + opt["short"] = "D"; + opt["arg"] = "string"; + opt["default"] = ""; + opt["help"] = "Which parser to use for data tracks"; + config->addOption("datatrack", opt); + + capa["optional"]["datatrack"]["name"] = "MPEG Data track parser"; + capa["optional"]["datatrack"]["help"] = "Which parser to use for data tracks"; + capa["optional"]["datatrack"]["type"] = "select"; + capa["optional"]["datatrack"]["option"] = "--datatrack"; + capa["optional"]["datatrack"]["short"] = "D"; + capa["optional"]["datatrack"]["default"] = ""; + capa["optional"]["datatrack"]["select"][0u][0u] = ""; + capa["optional"]["datatrack"]["select"][0u][1u] = "None / disabled"; + capa["optional"]["datatrack"]["select"][1u][0u] = "json"; + capa["optional"]["datatrack"]["select"][1u][1u] = "2b size-prepended JSON"; } // Buffers TS packets and sends after 7 are buffered. diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp index fb76ba5f..38951753 100644 --- a/src/output/output_tssrt.cpp +++ b/src/output/output_tssrt.cpp @@ -119,6 +119,9 @@ namespace Mist{ onFinish(); return; } + if (config->getString("datatrack") == "json"){ + tsIn.setRawDataParser(TS::JSON); + } parseData = false; wantRequest = true; } @@ -209,6 +212,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("MP2"); capa["codecs"][0u][1u].append("opus"); + capa["codecs"][0u][2u].append("JSON"); capa["codecs"][1u][0u].append("rawts"); cfg->addConnectorOptions(8889, capa); config = cfg; @@ -279,6 +283,25 @@ namespace Mist{ opt["arg_num"] = 1; opt["help"] = "Target srt:// URL to push out towards."; cfg->addOption("target", opt); + + capa["optional"]["datatrack"]["name"] = "MPEG Data track parser"; + capa["optional"]["datatrack"]["help"] = "Which parser to use for data tracks"; + capa["optional"]["datatrack"]["type"] = "select"; + capa["optional"]["datatrack"]["option"] = "--datatrack"; + capa["optional"]["datatrack"]["short"] = "D"; + capa["optional"]["datatrack"]["default"] = ""; + capa["optional"]["datatrack"]["select"][0u][0u] = ""; + capa["optional"]["datatrack"]["select"][0u][1u] = "None / disabled"; + capa["optional"]["datatrack"]["select"][1u][0u] = "json"; + capa["optional"]["datatrack"]["select"][1u][1u] = "2b size-prepended JSON"; + + opt.null(); + opt["long"] = "datatrack"; + opt["short"] = "D"; + opt["arg"] = "string"; + opt["default"] = ""; + opt["help"] = "Which parser to use for data tracks"; + config->addOption("datatrack", opt); } // Buffers TS packets and sends after 7 are buffered.