From e217f41f170ae542b3a9a8f157779cb98e96d743 Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Tue, 12 Nov 2019 14:03:35 +0100 Subject: [PATCH] CMAF Push Output --- lib/cmaf.cpp | 161 ++++++++++++++++++++++++++--- lib/cmaf.h | 8 +- lib/mp4.cpp | 1 + lib/mp4_generic.cpp | 104 +++++++++++++++++-- lib/mp4_generic.h | 20 +++- src/output/output_cmaf.cpp | 207 ++++++++++++++++++++++++++++++++++++- src/output/output_cmaf.h | 34 ++++++ 7 files changed, 509 insertions(+), 26 deletions(-) diff --git a/lib/cmaf.cpp b/lib/cmaf.cpp index 82caa0e7..37776f66 100644 --- a/lib/cmaf.cpp +++ b/lib/cmaf.cpp @@ -1,14 +1,20 @@ #include "cmaf.h" namespace CMAF{ - size_t payloadSize(const DTSC::Meta &M, size_t track, size_t fragment){ + /// Function to determine the payload size of a CMAF fragment. + /// \parm isKeyIndex indicates whether we are sending DTSC Fragment or DTSC Key based CMAF fragments. + size_t payloadSize(const DTSC::Meta &M, size_t track, size_t index, bool isKeyIndex){ DTSC::Fragments fragments(M.fragments(track)); DTSC::Keys keys(M.keys(track)); DTSC::Parts parts(M.parts(track)); - size_t firstKey = fragments.getFirstKey(fragment); + size_t firstKey = (isKeyIndex ? index : fragments.getFirstKey(index)); size_t endKey = keys.getEndValid(); - if (fragment + 1 < fragments.getEndValid()){endKey = fragments.getFirstKey(fragment + 1);} + if (isKeyIndex) { + if (index + 1 < keys.getEndValid()){endKey = index + 1;} + } else { + if (index + 1 < fragments.getEndValid()){endKey = fragments.getFirstKey(index + 1);} + } size_t firstPart = keys.getFirstPart(firstKey); size_t endPart = parts.getEndValid(); @@ -22,13 +28,13 @@ namespace CMAF{ // EDTS Box needed? + 36 size_t res = 36 + 8 + 108 + 8 + 92 + 8 + 32 + 33 + 44 + 8 + 20 + 16 + 16 + 16 + 40; - res += M.getTrackIdentifier(track).size(); + res += M.getType(track).size(); // Type-specific boxes std::string tType = M.getType(track); - if (tType == "video"){res += 20 + 16 + 86 + 16 + 8 + M.getInit(track).size();} + if (tType == "video"){res += 20 + 16 + 86 + 16 + 8 + M.getInit(track).size() + 20;}//20 for btrt box if (tType == "audio"){ - res += 16 + 16 + 36 + 35 + (M.getInit(track).size() ? 2 + M.getInit(track).size() : 0); + res += 16 + 16 + 36 + 35 + (M.getInit(track).size() ? 2 + M.getInit(track).size() : 0) + 20;//20 for btrt box } if (tType == "meta"){res += 12 + 16 + 64;} @@ -36,8 +42,16 @@ namespace CMAF{ return res; } + + size_t simplifiedTrackId(const DTSC::Meta & M, size_t idx) { + std::string type = M.getType(idx); + if (type == "video") {return 1;} + if (type == "audio") {return 2;} + if (type == "meta") {return 3;} + return idx; + } - std::string trackHeader(const DTSC::Meta &M, size_t track){ + std::string trackHeader(const DTSC::Meta &M, size_t track, bool simplifyTrackIds){ std::string tType = M.getType(track); std::stringstream header; @@ -53,12 +67,15 @@ namespace CMAF{ MP4::MOOV moovBox; MP4::MVHD mvhdBox(0); - mvhdBox.setTrackID(track + 2); // This value needs to point to an unused trackid + mvhdBox.setTrackID(0xFFFFFFFF); // This value needs to point to an unused trackid moovBox.setContent(mvhdBox, 0); MP4::TRAK trakBox; MP4::TKHD tkhdBox(M, track); + if (simplifyTrackIds){ + tkhdBox.setTrackID(simplifiedTrackId(M, track)); + } tkhdBox.setDuration(0); trakBox.setContent(tkhdBox, 0); @@ -67,7 +84,7 @@ namespace CMAF{ MP4::MDHD mdhdBox(0, M.getLang(track)); mdiaBox.setContent(mdhdBox, 0); - MP4::HDLR hdlrBox(tType, M.getTrackIdentifier(track)); + MP4::HDLR hdlrBox(tType, M.getType(track)); mdiaBox.setContent(hdlrBox, 1); MP4::MINF minfBox; @@ -95,9 +112,21 @@ namespace CMAF{ MP4::STSD stsdBox(0); if (tType == "video"){ MP4::VisualSampleEntry sampleEntry(M, track); + MP4::BTRT btrtBox; + btrtBox.setDecodingBufferSize(0xFFFFFFFFull); + btrtBox.setAverageBitrate(M.getBps(track)); + btrtBox.setMaxBitrate(M.getMaxBps(track)); + + sampleEntry.setBoxEntry(sampleEntry.getBoxEntryCount(),btrtBox); stsdBox.setEntry(sampleEntry, 0); }else if (tType == "audio"){ MP4::AudioSampleEntry sampleEntry(M, track); + MP4::BTRT btrtBox; + btrtBox.setDecodingBufferSize(0xFFFFFFFFull); + btrtBox.setAverageBitrate(M.getBps(track)); + btrtBox.setMaxBitrate(M.getMaxBps(track)); + + sampleEntry.setBoxEntry(sampleEntry.getBoxEntryCount(),btrtBox); stsdBox.setEntry(sampleEntry, 0); }else if (tType == "meta"){ MP4::TextSampleEntry sampleEntry(M, track); @@ -132,6 +161,9 @@ namespace CMAF{ } MP4::TREX trexBox(track + 1); + if (simplifyTrackIds){ + trexBox.setTrackID(simplifiedTrackId(M, track)); + } trexBox.setDefaultSampleDuration(1000); mvexBox.setContent(trexBox, M.getVod() ? 1 : 0); @@ -196,7 +228,7 @@ namespace CMAF{ return tmpRes; } - std::string fragmentHeader(const DTSC::Meta &M, size_t track, size_t fragment){ + std::string fragmentHeader(const DTSC::Meta &M, size_t track, size_t fragment, bool simplifyTrackIds, bool UTCTime){ DTSC::Fragments fragments(M.fragments(track)); DTSC::Keys keys(M.keys(track)); @@ -258,18 +290,21 @@ namespace CMAF{ tfhdBox.setFlags(MP4::tfhdSampleFlag | MP4::tfhdBaseIsMoof | MP4::tfhdSampleDesc); tfhdBox.setTrackID(track + 1); + if (simplifyTrackIds){ + tfhdBox.setTrackID(simplifiedTrackId(M, track)); + } tfhdBox.setDefaultSampleDuration(444); tfhdBox.setDefaultSampleSize(444); tfhdBox.setDefaultSampleFlags((M.getType(track) == "video") ? (MP4::noIPicture | MP4::noKeySample) : (MP4::isIPicture | MP4::isKeySample)); - tfhdBox.setSampleDescriptionIndex(0); + tfhdBox.setSampleDescriptionIndex(1); trafBox.setContent(tfhdBox, 0); MP4::TFDT tfdtBox; if (M.getVod()){ tfdtBox.setBaseMediaDecodeTime(M.getTimeForFragmentIndex(track, fragment) - M.getFirstms(track)); }else{ - tfdtBox.setBaseMediaDecodeTime(M.getTimeForFragmentIndex(track, fragment)); + tfdtBox.setBaseMediaDecodeTime((UTCTime ? Util::epoch()*1000 : M.getTimeForFragmentIndex(track, fragment))); } trafBox.setContent(tfdtBox, 1); @@ -299,4 +334,106 @@ namespace CMAF{ return header.str(); } + + /// Calculates the full size of a 'moof' box for a DTSC::Key based fragment. + /// Used when building the 'moof' box to calculate the relative data offsets. + size_t keyHeaderSize(const DTSC::Meta &M, size_t track, size_t key){ + uint64_t tmpRes = 8 + 16 + 32 + 20; + + DTSC::Keys keys(M.keys(track)); + DTSC::Parts parts(M.parts(track)); + + size_t firstPart = keys.getFirstPart(key); + size_t endPart = parts.getEndValid(); + if (key + 1 < keys.getEndValid()){ + endPart = keys.getFirstPart(key + 1); + } + + tmpRes += 24 + ((endPart - firstPart) * 12); + return tmpRes; + } + + /// Generates the 'moof' box for a DTSC::Key based CMAF fragment. + std::string keyHeader(const DTSC::Meta &M, size_t track, size_t key, bool simplifyTrackIds, bool UTCTime){ + DTSC::Keys keys(M.keys(track)); + DTSC::Parts parts(M.parts(track)); + + size_t firstPart = keys.getFirstPart(key); + size_t endPart = parts.getEndValid(); + if (key + 1 < keys.getEndValid()){ + endPart = keys.getFirstPart(key + 1); + } + + std::stringstream header; + + MP4::MOOF moofBox; + MP4::MFHD mfhdBox(key + 1); + moofBox.setContent(mfhdBox, 0); + + + std::set trunOrder; + + //We use keyHeaderSize here to determine the relative offsets of the data in the 'mdat' box. + uint64_t relativeOffset = keyHeaderSize(M, track, key) + 8; + + sortPart temp; + temp.time = keys.getTime(key); + temp.partIndex = firstPart; + temp.bytePos = relativeOffset; + + for (size_t p = firstPart; p < endPart; p++){ + trunOrder.insert(temp); + temp.time += parts.getDuration(p); + temp.partIndex++; + temp.bytePos += parts.getSize(p); + } + + MP4::TRAF trafBox; + MP4::TFHD tfhdBox; + + tfhdBox.setFlags(MP4::tfhdSampleFlag | MP4::tfhdBaseIsMoof | MP4::tfhdSampleDesc); + tfhdBox.setTrackID(track + 1); + if (simplifyTrackIds){ + tfhdBox.setTrackID(simplifiedTrackId(M, track)); + } + tfhdBox.setDefaultSampleDuration(444); + tfhdBox.setDefaultSampleSize(444); + tfhdBox.setDefaultSampleFlags((M.getType(track) == "video") ? (MP4::noIPicture | MP4::noKeySample) + : (MP4::isIPicture | MP4::isKeySample)); + tfhdBox.setSampleDescriptionIndex(1); + trafBox.setContent(tfhdBox, 0); + + MP4::TFDT tfdtBox; + if (M.getVod()){ + tfdtBox.setBaseMediaDecodeTime(keys.getTime(key) - M.getFirstms(track)); + }else{ + tfdtBox.setBaseMediaDecodeTime((UTCTime ? Util::epoch()*1000 : keys.getTime(key) )); + } + trafBox.setContent(tfdtBox, 1); + + MP4::TRUN trunBox; + trunBox.setFlags(MP4::trundataOffset | MP4::trunfirstSampleFlags | MP4::trunsampleSize | + MP4::trunsampleDuration | MP4::trunsampleOffsets); + + trunBox.setDataOffset(trunOrder.begin()->bytePos); + + trunBox.setFirstSampleFlags(MP4::isIPicture | MP4::isKeySample); + + size_t trunOffset = 0; + + for (std::set::iterator it = trunOrder.begin(); it != trunOrder.end(); it++){ + MP4::trunSampleInformation sampleInfo; + sampleInfo.sampleSize = parts.getSize(it->partIndex); + sampleInfo.sampleDuration = parts.getDuration(it->partIndex); + sampleInfo.sampleOffset = parts.getOffset(it->partIndex); + trunBox.setSampleInformation(sampleInfo, trunOffset++); + } + trafBox.setContent(trunBox, 2); + + moofBox.setContent(trafBox, 1); + + header.write(moofBox.asBox(), moofBox.boxedSize()); + + return header.str(); + } }// namespace CMAF diff --git a/lib/cmaf.h b/lib/cmaf.h index 7b151fd1..793f1b34 100644 --- a/lib/cmaf.h +++ b/lib/cmaf.h @@ -4,9 +4,11 @@ #include namespace CMAF{ - size_t payloadSize(const DTSC::Meta &M, size_t track, size_t fragment); + size_t payloadSize(const DTSC::Meta &M, size_t track, size_t index, bool isKeyIndex = false); size_t trackHeaderSize(const DTSC::Meta &M, size_t track); - std::string trackHeader(const DTSC::Meta &M, size_t track); + std::string trackHeader(const DTSC::Meta &M, size_t track, bool simplifyTrackIds = false); size_t fragmentHeaderSize(const DTSC::Meta &M, size_t track, size_t fragment); - std::string fragmentHeader(const DTSC::Meta &M, size_t track, size_t fragment); + std::string fragmentHeader(const DTSC::Meta &M, size_t track, size_t fragment, bool simplifyTrackIds = false, bool UTCTime = false); + size_t keyHeaderSize(const DTSC::Meta &M, size_t track, size_t key); + std::string keyHeader(const DTSC::Meta &M, size_t track, size_t key, bool simplifyTrackIds = false, bool UTCTime = false); }// namespace CMAF diff --git a/lib/mp4.cpp b/lib/mp4.cpp index 68bc9b5d..38a49a2c 100644 --- a/lib/mp4.cpp +++ b/lib/mp4.cpp @@ -308,6 +308,7 @@ namespace MP4{ case 0x74656E63: return ((TENC *)this)->toPrettyString(indent); break; case 0x7361697A: return ((SAIZ *)this)->toPrettyString(indent); break; case 0x7361696F: return ((SAIO *)this)->toPrettyString(indent); break; + case 0x62747274: return ((BTRT *)this)->toPrettyString(indent); break; /*LTS-END*/ default: INFO_MSG("no code found: 0x%.8x", Bit::btohl(data + 4)); break; } diff --git a/lib/mp4_generic.cpp b/lib/mp4_generic.cpp index fb692aca..62bc9714 100644 --- a/lib/mp4_generic.cpp +++ b/lib/mp4_generic.cpp @@ -75,6 +75,38 @@ namespace MP4{ return r.str(); } + BTRT::BTRT() { + memcpy(data + 4, "btrt", 4); + } + + uint32_t BTRT::getDecodingBufferSize(){ + return getInt32(0); + } + void BTRT::setDecodingBufferSize(uint32_t val){ + setInt32(val, 0); + } + uint32_t BTRT::getMaxBitrate(){ + return getInt32(4); + } + void BTRT::setMaxBitrate(uint32_t val){ + setInt32(val, 4); + } + uint32_t BTRT::getAverageBitrate(){ + return getInt32(8); + } + void BTRT::setAverageBitrate(uint32_t val){ + setInt32(val, 8); + } + + std::string BTRT::toPrettyString(uint32_t indent) { + std::stringstream r; + r << std::string(indent, ' ') << "[btrt] Bitrate Box (" << boxedSize() << ")" << std::endl; + r << std::string(indent+2, ' ') << "DecodingBufferSize: " << getDecodingBufferSize() << std::endl; + r << std::string(indent+2, ' ') << "Maximum Bitrate: " << getMaxBitrate() << std::endl; + r << std::string(indent+2, ' ') << "Average Bitrate: " << getAverageBitrate() << std::endl; + return r.str(); + } + TRUN::TRUN(){memcpy(data + 4, "trun", 4);} void TRUN::setFlags(uint32_t newFlags){setInt24(newFlags, 1);} @@ -1217,6 +1249,14 @@ namespace MP4{ memcpy(data + payloadOffset + 4, newMinorVersion, 4); } + std::string FTYP::getMinorVersionHex(){ + static char zero[4] ={0, 0, 0, 0}; + if (memcmp(zero, data + payloadOffset + 4, 4) == 0){return "";} + char val[20]; + snprintf(val, 20, "%.2X%.2X%.2X%.2X", data[payloadOffset + 4], data[payloadOffset + 5], data[payloadOffset + 6], data[payloadOffset + 7]); + return std::string(val); + } + std::string FTYP::getMinorVersion(){ static char zero[4] ={0, 0, 0, 0}; if (memcmp(zero, data + payloadOffset + 4, 4) == 0){return "";} @@ -1241,7 +1281,7 @@ namespace MP4{ std::stringstream r; r << std::string(indent, ' ') << "[ftyp] File Type (" << boxedSize() << ")" << std::endl; r << std::string(indent + 1, ' ') << "MajorBrand: " << getMajorBrand() << std::endl; - r << std::string(indent + 1, ' ') << "MinorVersion: " << getMinorVersion() << std::endl; + r << std::string(indent + 1, ' ') << "MinorVersion: 0x" << getMinorVersionHex() << std::endl; r << std::string(indent + 1, ' ') << "CompatibleBrands (" << getCompatibleBrandsCount() << "):" << std::endl; for (unsigned int i = 0; i < getCompatibleBrandsCount(); i++){ @@ -1765,7 +1805,7 @@ namespace MP4{ if (i != getMatrixCount() - 1){r << ", ";} } r << std::endl; - r << std::string(indent + 1, ' ') << "TrackID: " << getTrackID() << std::endl; + r << std::string(indent + 1, ' ') << "next_track_ID: " << getTrackID() << std::endl; return r.str(); } @@ -2887,6 +2927,55 @@ namespace MP4{ } /*LTS-END*/ + size_t AudioSampleEntry::getBoxEntryCount(){ + if (payloadSize() < 36){// if the EntryBox is not big enough to hold any box + return 0; + } + size_t count = 0; + size_t offset = 28; + while (offset < payloadSize()){ + offset += getBoxLen(offset); + count++; + } + return count; + } + + Box &AudioSampleEntry::getBoxEntry(size_t index){ + static Box ret = Box((char *)"\000\000\000\010erro", false); + if (index >= getBoxEntryCount()){return ret;} + size_t count = 0; + size_t offset = 28; + while (offset < payloadSize()){ + if (count == index){return getBox(offset);} + offset += getBoxLen(offset); + count++; + } + return ret; + } + + void AudioSampleEntry::setBoxEntry(size_t index, Box &box){ + if (index > getBoxEntryCount()){ + index = getBoxEntryCount(); + WARN_MSG("This function can not leave empty spaces, appending at index %zu nstead!", index); + } + size_t count = 0; + size_t offset = 28; + while (offset < payloadSize()){ + if (count == index){ + setBox(box, offset); + return; + } + offset += getBoxLen(offset); + count++; + } + if (count == index){ + setBox(box, offset); + }else{ + INFO_MSG("Should not be here! Index is %zu, count is %zu, offset is %zu, payloadSize is %zu", + index, count, offset, payloadSize()); + } + } + std::string AudioSampleEntry::toPrettyAudioString(uint32_t indent, std::string name){ std::stringstream r; r << std::string(indent, ' ') << name << " (" << boxedSize() << ")" << std::endl; @@ -2895,10 +2984,13 @@ namespace MP4{ r << std::string(indent + 1, ' ') << "SampleSize: " << getSampleSize() << std::endl; r << std::string(indent + 1, ' ') << "PreDefined: " << getPreDefined() << std::endl; r << std::string(indent + 1, ' ') << "SampleRate: " << getSampleRate() << std::endl; - r << getCodecBox().toPrettyString(indent + 1) << std::endl; - /*LTS-START*/ - if (isType("enca")){r << getSINFBox().toPrettyString(indent + 1);} - /*LTS-END*/ + size_t firstBox = 0; + if (getBoxEntryCount() > firstBox){ + for (size_t index = firstBox; index < getBoxEntryCount(); ++index){ + MP4::Box tmpBox = getBoxEntry(index); + r << tmpBox.toPrettyString(indent + 1); + } + } return r.str(); } diff --git a/lib/mp4_generic.h b/lib/mp4_generic.h index b16adbb3..fd4761f2 100644 --- a/lib/mp4_generic.h +++ b/lib/mp4_generic.h @@ -31,7 +31,19 @@ namespace MP4{ }; // TRAF Box - struct trunSampleInformation{ + class BTRT: public Box { + public: + BTRT(); + uint32_t getDecodingBufferSize(); + void setDecodingBufferSize(uint32_t val); + uint32_t getMaxBitrate(); + void setMaxBitrate(uint32_t val); + uint32_t getAverageBitrate(); + void setAverageBitrate(uint32_t val); + std::string toPrettyString(uint32_t indent = 0); + }; + + struct trunSampleInformation { uint32_t sampleDuration; uint32_t sampleSize; uint32_t sampleFlags; @@ -273,6 +285,7 @@ namespace MP4{ void setMajorBrand(const char *newMajorBrand); std::string getMajorBrand(); void setMinorVersion(const char *newMinorVersion); + std::string getMinorVersionHex(); std::string getMinorVersion(); size_t getCompatibleBrandsCount(); void setCompatibleBrands(const char *newCompatibleBrand, size_t index); @@ -719,6 +732,11 @@ namespace MP4{ void setCodecBox(Box &newBox); Box &getCodecBox(); Box &getSINFBox(); /*LTS*/ + + size_t getBoxEntryCount(); + Box &getBoxEntry(size_t index); + void setBoxEntry(size_t index, Box &box); + std::string toPrettyAudioString(uint32_t indent = 0, std::string name = ""); }; diff --git a/src/output/output_cmaf.cpp b/src/output/output_cmaf.cpp index 5c03c680..528035d2 100644 --- a/src/output/output_cmaf.cpp +++ b/src/output/output_cmaf.cpp @@ -16,13 +16,80 @@ uint64_t bootMsOffset; namespace Mist{ + void CMAFPushTrack::connect(std::string debugParam) { + D.setHeader("Transfer-Encoding", "chunked"); + D.prepareRequest(url, "POST"); + + HTTP::Parser & http = D.getHTTP(); + http.sendingChunks = true; + http.SendRequest(D.getSocket()); + + if (debugParam.length()){ + if (debugParam[debugParam.length()-1] != '/'){ + debugParam += '/'; + } + debug = true; + std::string filename = url.getUrl(); + filename.erase(0, filename.rfind("/")+1); + snprintf(debugName, 500, "%s%s-%" PRIu64, debugParam.c_str(), filename.c_str(), Util::bootMS()); + INFO_MSG("CMAF DEBUG FILE: %s", debugName); + debugFile = fopen(debugName, "wb"); + } + } + + void CMAFPushTrack::disconnect() { + Socket::Connection & sock = D.getSocket(); + + MP4::MFRA mfraBox; + send(mfraBox.asBox(), mfraBox.boxedSize()); + send(""); + sock.close(); + + if (debugFile) { + fclose(debugFile); + debugFile = 0; + } + } + + void CMAFPushTrack::send(const char * data, size_t len){ + D.getHTTP().Chunkify(data, len, D.getSocket()); + if (debug && debugFile) { + fwrite(data, 1, len, debugFile); + + } + } + + void CMAFPushTrack::send(const std::string & data){ + send(data.data(), data.size()); + } OutCMAF::OutCMAF(Socket::Connection &conn) : HTTPOutput(conn){ uaDelay = 0; - realTime = 0; + if (config->getString("target").size()){ + needsLookAhead = 5000; + + streamName = config->getString("streamname"); + std::string target = config->getString("target"); + target.replace(0, 4, "http");//Translate to http for cmaf:// or https for cmafs:// + pushUrl = HTTP::URL(target); + + INFO_MSG("About to push stream %s out. Host: %s, port: %d, location: %s", streamName.c_str(), + pushUrl.host.c_str(), pushUrl.getPort(), pushUrl.path.c_str()); + initialize(); + initialSeek(); + startPushOut(); + } else { + realTime = 0; + } + INFO_MSG("Out of constructor now"); } - OutCMAF::~OutCMAF(){} + //Properly end all tracks on shutdown. + OutCMAF::~OutCMAF() { + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + onTrackEnd(it->first); + } + } void OutCMAF::init(Util::Config *cfg){ HTTPOutput::init(cfg); @@ -66,6 +133,16 @@ namespace Mist{ "Disables chunked transfer encoding, forcing per-segment buffering. Reduces performance " "significantly, but increases compatibility somewhat."; capa["optional"]["nonchunked"]["option"] = "--nonchunked"; + + capa["push_urls"].append("cmaf://*"); + capa["push_urls"].append("cmafs://*"); + + JSON::Value opt; + opt["arg"] = "string"; + opt["default"] = ""; + opt["arg_num"] = 1; + opt["help"] = "Target CMAF URL to push out towards."; + cfg->addOption("target", opt); } void OutCMAF::onHTTP(){ @@ -157,7 +234,7 @@ namespace Mist{ uint64_t fragmentIndex = M.getFragmentIndexForTime(idx, startTime); targetTime = M.getTimeForFragmentIndex(idx, fragmentIndex + 1); - std::string headerData = CMAF::fragmentHeader(M, idx, fragmentIndex); + std::string headerData = CMAF::fragmentHeader(M, idx, fragmentIndex, false, false); H.Chunkify(headerData.c_str(), headerData.size(), myConn); uint64_t mdatSize = 8 + CMAF::payloadSize(M, idx, fragmentIndex); @@ -172,7 +249,13 @@ namespace Mist{ parseData = true; } + + void OutCMAF::sendNext(){ + if (isRecording()){ + pushNext(); + return; + } if (thisPacket.getTime() >= targetTime){ HIGH_MSG("Finished playback to %" PRIu64, targetTime); wantRequest = true; @@ -682,6 +765,122 @@ namespace Mist{ r << "\n"; return toUTF16(r.str()); - }// namespace Mist + } + + /**********************************/ + /* CMAF Push Output functionality */ + /**********************************/ + + //When we disconnect a track, or when we're done pushing out, send an empty 'mfra' box to indicate track end. + void OutCMAF::onTrackEnd(size_t idx) { + if (!isRecording()){return;} + if (!pushTracks.count(idx) || !pushTracks.at(idx).D.getSocket()){return;} + INFO_MSG("Disconnecting track %zu", idx); + pushTracks[idx].disconnect(); + + pushTracks.erase(idx); + } + + //Create the connections and post request needed to start pushing out a track. + void OutCMAF::setupTrackObject(size_t idx) { + CMAFPushTrack & track = pushTracks[idx]; + track.url = pushUrl; + if (targetParams.count("usp") && targetParams["usp"] == "1"){ + std::string usp_path = "Streams(" + M.getType(idx) + + "_" + JSON::Value(idx).asString() + ")"; + track.url = track.url.link(usp_path); + }else{ + track.url.path += "/"; + track.url = track.url.link(M.getTrackIdentifier(idx)); + } + + track.connect(targetParams["debug"]); + + std::string header = CMAF::trackHeader(M, idx, true); + track.send(header); + } + + + /// Function that waits at most `maxWait` ms (in steps of 100ms) for the next keyframe to become available. + /// Uses thisIdx and thisPacket to determine track and current timestamp respectively. + bool OutCMAF::waitForNextKey(uint64_t maxWait){ + size_t currentKey = M.getKeyIndexForTime(thisIdx, thisPacket.getTime()); + DTSC::Keys keys(M.keys(thisIdx)); + size_t waitTimes = maxWait / 100; + for (size_t i = 0; i < waitTimes; ++i){ + if (keys.getEndValid() > currentKey + 1){return true;} + Util::wait(100); + //Make sure we don't accidentally timeout while waiting - runs approximately every second. + if (i % 10 == 0){ + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); ++it){ + it->second.keepAlive(); + stats(); + } + } + } + return (keys.getEndValid() > currentKey + 1); + } + + //Set up an empty connection to the target to make sure we can push data towards it. + void OutCMAF::startPushOut(){ + myConn.close(); + myConn.Received().clear(); + myConn.open(pushUrl.host, pushUrl.getPort(), true); + wantRequest = false; + parseData = true; + } + + //CMAF Push output uses keyframe boundaries instead of fragment boundaries, to allow for lower latency + void OutCMAF::pushNext() { + //Set up a new connection if this is a new track, or if we have been disconnected. + if (!pushTracks.count(thisIdx) || !pushTracks.at(thisIdx).D.getSocket()){ + CMAFPushTrack & track = pushTracks[thisIdx]; + size_t keyIndex = M.getKeyIndexForTime(thisIdx, thisPacket.getTime()); + track.headerFrom = M.getTimeForKeyIndex(thisIdx, keyIndex); + if (track.headerFrom < thisPacket.getTime()){ + track.headerFrom = M.getTimeForKeyIndex(thisIdx, keyIndex + 1); + } + + HIGH_MSG("Starting track %zu at %" PRIu64 "ms into the stream, current packet at %" PRIu64 "ms", thisIdx, track.headerFrom, thisPacket.getTime()); + + setupTrackObject(thisIdx); + track.headerUntil = 0; + + } + CMAFPushTrack & track = pushTracks[thisIdx]; + if (thisPacket.getTime() < track.headerFrom){return;} + if (thisPacket.getTime() > track.headerUntil || !track.headerUntil){ + size_t keyIndex = M.getKeyIndexForTime(thisIdx, thisPacket.getTime()); + uint64_t keyTime = M.getTimeForKeyIndex(thisIdx, keyIndex); + if (keyTime != thisPacket.getTime()){ + WARN_MSG("Corruption probably occured, initiating reconnect %" PRIu64 " != %" PRIu64, keyTime, thisPacket.getTime()); + onTrackEnd(thisIdx); + track.headerFrom = M.getTimeForKeyIndex(thisIdx, keyIndex + 1); + track.headerUntil = 0; + pushNext(); + return; + } + track.headerFrom = keyTime; + if (!waitForNextKey()){ + onTrackEnd(thisIdx); + dropTrack(thisIdx, "No next keyframe available"); + return; + } + track.headerUntil = M.getTimeForKeyIndex(thisIdx, keyIndex + 1) - 1; + std::string keyHeader = CMAF::keyHeader(M, thisIdx, keyIndex, true, true); + + uint64_t mdatSize = 8 + CMAF::payloadSize(M, thisIdx, keyIndex, true); + char mdatHeader[] ={0x00, 0x00, 0x00, 0x00, 'm', 'd', 'a', 't'}; + Bit::htobl(mdatHeader, mdatSize); + + track.send(keyHeader); + track.send(mdatHeader, 8); + } + char *data; + size_t dataLen; + thisPacket.getString("data", data, dataLen); + + track.send(data, dataLen); + } + }// namespace Mist diff --git a/src/output/output_cmaf.h b/src/output/output_cmaf.h index 76fa63b8..f24bfab3 100644 --- a/src/output/output_cmaf.h +++ b/src/output/output_cmaf.h @@ -1,8 +1,30 @@ #include "output_http.h" #include +#include #include namespace Mist{ + /// Keeps track of the state of an outgoing CMAF Push track. + class CMAFPushTrack { + public: + CMAFPushTrack() {debug = false; debugFile = 0;} + ~CMAFPushTrack() {disconnect();} + void connect(std::string debugParam = ""); + void disconnect(); + + void send(const char * data, size_t len); + void send(const std::string & data); + + HTTP::Downloader D; + HTTP::URL url; + uint64_t headerFrom; + uint64_t headerUntil; + + bool debug; + char debugName[500]; + FILE * debugFile; + }; + class OutCMAF : public HTTPOutput{ public: OutCMAF(Socket::Connection &conn); @@ -13,6 +35,8 @@ namespace Mist{ void sendHeader(){}; protected: + void onTrackEnd(size_t idx); + void sendDashManifest(); void dashAdaptationSet(size_t id, size_t idx, std::stringstream &r); void dashRepresentation(size_t id, size_t idx, std::stringstream &r); @@ -37,6 +61,16 @@ namespace Mist{ std::string h264init(const std::string &initData); std::string h265init(const std::string &initData); + + // For CMAF push out + void startPushOut(); + void pushNext(); + + HTTP::URL pushUrl; + std::map pushTracks; + void setupTrackObject(size_t idx); + bool waitForNextKey(uint64_t maxWait = 5000); + // End CMAF push out }; }// namespace Mist