From 17aa6bbbb681456cfc79b50208efbf58ef2ced17 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 21 May 2015 19:10:05 +0200 Subject: [PATCH] Compatibility improvements as well as simplification to RTMP push input. --- lib/flv_tag.cpp | 154 +++++++++++++++++-------------------- lib/flv_tag.h | 3 +- src/input/input_flv.cpp | 6 +- src/output/output_rtmp.cpp | 60 ++++----------- src/output/output_rtmp.h | 14 ---- 5 files changed, 90 insertions(+), 147 deletions(-) diff --git a/lib/flv_tag.cpp b/lib/flv_tag.cpp index 5437f4ea..5dcc9212 100644 --- a/lib/flv_tag.cpp +++ b/lib/flv_tag.cpp @@ -1,7 +1,6 @@ /// \file flv_tag.cpp /// Holds all code for the FLV namespace. -#include "amf.h" #include "rtmpchunks.h" #include "flv_tag.h" #include "timing.h" @@ -1033,9 +1032,18 @@ bool FLV::Tag::FileLoader(FILE * f) { return false; } //FLV_GetPacket -JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata) { +JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack) { JSON::Value pack_out; // Storage for outgoing metadata. + if (!reTrack){ + switch (data[0]){ + case 0x09: reTrack = 1; break;//video + case 0x08: reTrack = 2; break;//audio + case 0x12: reTrack = 3; break;//meta + } + } + pack_out["trackid"] = reTrack; + if (data[0] == 0x12) { AMF::Object meta_in = AMF::parse((unsigned char *)data + 11, len - 15); AMF::Object * tmp = 0; @@ -1047,126 +1055,97 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata) { } } if (tmp) { - if (tmp->getContentP("width")) { - metadata.tracks[1].width = (long long int)tmp->getContentP("width")->NumValue(); - } else { - metadata.tracks[1].width = 0; - } - if (tmp->getContentP("height")) { - metadata.tracks[1].height = (long long int)tmp->getContentP("height")->NumValue(); - } else { - metadata.tracks[1].height = 0; - } - if (tmp->getContentP("videoframerate")) { - if (tmp->getContentP("videoframerate")->NumValue()){ - metadata.tracks[1].fpks = (long long int)(tmp->getContentP("videoframerate")->NumValue() * 1000.0); - }else{ - metadata.tracks[1].fpks = JSON::Value(tmp->getContentP("videoframerate")->StrValue()).asInt() * 1000.0; - } - } else { - metadata.tracks[1].fpks = 0; - } - if (tmp->getContentP("videodatarate")) { - metadata.tracks[1].bps = (long long int)(tmp->getContentP("videodatarate")->NumValue() * 1024) / 8; - } else { - metadata.tracks[1].bps = 0; - } - if (tmp->getContentP("audiodatarate")) { - metadata.tracks[2].bps = (long long int)(tmp->getContentP("audiodatarate")->NumValue() * 1024) / 8; - } else { - metadata.tracks[2].bps = 0; - } - if (tmp->getContentP("audiosamplerate")) { - metadata.tracks[2].rate = (long long int)tmp->getContentP("audiosamplerate")->NumValue(); - } else { - metadata.tracks[2].rate = 0; - } - if (tmp->getContentP("audiosamplesize")) { - metadata.tracks[2].size = (long long int)tmp->getContentP("audiosamplesize")->NumValue(); - } else { - metadata.tracks[2].size = 0; - } - if (tmp->getContentP("stereo")) { - if (tmp->getContentP("stereo")->NumValue() == 1) { - metadata.tracks[2].channels = 2; - } else { - metadata.tracks[2].channels = 1; - } - } else { - metadata.tracks[2].channels = 1; - } + amf_storage = *tmp; + bool empty = true; for (int i = 0; i < tmp->hasContent(); ++i) { if (tmp->getContentP(i)->Indice() == "videocodecid" || tmp->getContentP(i)->Indice() == "audiocodecid" || tmp->getContentP(i)->Indice() == "width" || tmp->getContentP(i)->Indice() == "height" || tmp->getContentP(i)->Indice() == "videodatarate" || tmp->getContentP(i)->Indice() == "videoframerate" || tmp->getContentP(i)->Indice() == "audiodatarate" || tmp->getContentP(i)->Indice() == "audiosamplerate" || tmp->getContentP(i)->Indice() == "audiosamplesize" || tmp->getContentP(i)->Indice() == "audiochannels") { continue; } if (tmp->getContentP(i)->NumValue()) { pack_out["data"][tmp->getContentP(i)->Indice()] = (long long)tmp->getContentP(i)->NumValue(); + empty = false; } else { if (tmp->getContentP(i)->StrValue() != "") { pack_out["data"][tmp->getContentP(i)->Indice()] = tmp->getContentP(i)->StrValue(); + empty = false; } } } - if (pack_out) { + if (!empty) { pack_out["datatype"] = "meta"; pack_out["time"] = tagTime(); + }else{ + pack_out.null(); } } return pack_out; //empty } if (data[0] == 0x08) { char audiodata = data[11]; - metadata.tracks[2].trackID = 2; - metadata.tracks[2].type = "audio"; - if (metadata.tracks[2].codec == "") { - metadata.tracks[2].codec = getAudioCodec(); + metadata.tracks[reTrack].trackID = reTrack; + metadata.tracks[reTrack].type = "audio"; + if (metadata.tracks[reTrack].codec == "") { + metadata.tracks[reTrack].codec = getAudioCodec(); } - if (!metadata.tracks[2].rate) { + if (!metadata.tracks[reTrack].rate) { switch (audiodata & 0x0C) { case 0x0: - metadata.tracks[2].rate = 5512; + metadata.tracks[reTrack].rate = 5512; break; case 0x4: - metadata.tracks[2].rate = 11025; + metadata.tracks[reTrack].rate = 11025; break; case 0x8: - metadata.tracks[2].rate = 22050; + metadata.tracks[reTrack].rate = 22050; break; case 0xC: - metadata.tracks[2].rate = 44100; + metadata.tracks[reTrack].rate = 44100; break; } + if (amf_storage.getContentP("audiosamplerate")) { + metadata.tracks[reTrack].rate = (long long int)amf_storage.getContentP("audiosamplerate")->NumValue(); + } } - if (!metadata.tracks[2].size) { + if (!metadata.tracks[reTrack].size) { switch (audiodata & 0x02) { case 0x0: - metadata.tracks[2].size = 8; + metadata.tracks[reTrack].size = 8; break; case 0x2: - metadata.tracks[2].size = 16; + metadata.tracks[reTrack].size = 16; break; } + if (amf_storage.getContentP("audiosamplesize")) { + metadata.tracks[reTrack].size = (long long int)amf_storage.getContentP("audiosamplesize")->NumValue(); + } } - if (!metadata.tracks[2].channels) { + if (!metadata.tracks[reTrack].channels) { switch (audiodata & 0x01) { case 0x0: - metadata.tracks[2].channels = 1; + metadata.tracks[reTrack].channels = 1; break; case 0x1: - metadata.tracks[2].channels = 2; + metadata.tracks[reTrack].channels = 2; break; } + if (amf_storage.getContentP("stereo")) { + if (amf_storage.getContentP("stereo")->NumValue() == 1) { + metadata.tracks[reTrack].channels = 2; + } else { + metadata.tracks[reTrack].channels = 1; + } + } } if (needsInitData() && isInitData()) { if ((audiodata & 0xF0) == 0xA0) { - metadata.tracks[2].init = std::string((char *)data + 13, (size_t)len - 17); + metadata.tracks[reTrack].init = std::string((char *)data + 13, (size_t)len - 17); } else { - metadata.tracks[2].init = std::string((char *)data + 12, (size_t)len - 16); + metadata.tracks[reTrack].init = std::string((char *)data + 12, (size_t)len - 16); } + pack_out.null(); return pack_out; //skip rest of parsing, get next tag. } pack_out["time"] = tagTime(); - pack_out["trackid"] = 2; if ((audiodata & 0xF0) == 0xA0) { if (len < 18) { return JSON::Value(); @@ -1182,36 +1161,43 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata) { } if (data[0] == 0x09) { char videodata = data[11]; - if (metadata.tracks[1].codec == "") { - metadata.tracks[1].codec = getVideoCodec(); + if (metadata.tracks[reTrack].codec == "") { + metadata.tracks[reTrack].codec = getVideoCodec(); + } + metadata.tracks[reTrack].type = "video"; + metadata.tracks[reTrack].trackID = reTrack; + if (!metadata.tracks[reTrack].width || !metadata.tracks[reTrack].height){ + if (amf_storage.getContentP("width")) { + metadata.tracks[reTrack].width = (long long int)amf_storage.getContentP("width")->NumValue(); + } + if (amf_storage.getContentP("height")) { + metadata.tracks[reTrack].height = (long long int)amf_storage.getContentP("height")->NumValue(); + } + } + if (!metadata.tracks[reTrack].fpks && amf_storage.getContentP("videoframerate")) { + if (amf_storage.getContentP("videoframerate")->NumValue()){ + metadata.tracks[reTrack].fpks = (long long int)(amf_storage.getContentP("videoframerate")->NumValue() * 1000.0); + }else{ + metadata.tracks[reTrack].fpks = JSON::Value(amf_storage.getContentP("videoframerate")->StrValue()).asInt() * 1000.0; + } } - metadata.tracks[1].type = "video"; - metadata.tracks[1].trackID = 1; if (needsInitData() && isInitData()) { if ((videodata & 0x0F) == 7) { if (len < 21) { return JSON::Value(); } - metadata.tracks[1].init = std::string((char *)data + 16, (size_t)len - 20); + metadata.tracks[reTrack].init = std::string((char *)data + 16, (size_t)len - 20); } else { if (len < 17) { return JSON::Value(); } - metadata.tracks[1].init = std::string((char *)data + 12, (size_t)len - 16); + metadata.tracks[reTrack].init = std::string((char *)data + 12, (size_t)len - 16); } + pack_out.null(); return pack_out; //skip rest of parsing, get next tag. } - pack_out["trackid"] = 1; switch (videodata & 0xF0) { case 0x10: - pack_out["keyframe"] = 1; - break; - case 0x20: - pack_out["interframe"] = 1; - break; - case 0x30: - pack_out["disposableframe"] = 1; - break; case 0x40: pack_out["keyframe"] = 1; break; diff --git a/lib/flv_tag.h b/lib/flv_tag.h index deef0ea7..ac322091 100644 --- a/lib/flv_tag.h +++ b/lib/flv_tag.h @@ -5,6 +5,7 @@ #include "socket.h" #include "dtsc.h" #include "json.h" +#include "amf.h" #include @@ -52,7 +53,7 @@ namespace FLV { bool DTSCAudioInit(DTSC::Track & audio); bool DTSCMetaInit(DTSC::Meta & M, std::set & selTracks); bool DTSCMetaInit(DTSC::Stream & S, DTSC::Track & videoRef, DTSC::Track & audioRef); - JSON::Value toJSON(DTSC::Meta & metadata); + JSON::Value toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack = 0); bool MemLoader(char * D, unsigned int S, unsigned int & P); bool FileLoader(FILE * f); protected: diff --git a/src/input/input_flv.cpp b/src/input/input_flv.cpp index 7e32ca4a..23bea01f 100644 --- a/src/input/input_flv.cpp +++ b/src/input/input_flv.cpp @@ -67,11 +67,12 @@ namespace Mist { //Create header file from FLV data fseek(inFile, 13, SEEK_SET); FLV::Tag tmpTag; + AMF::Object amf_storage; long long int lastBytePos = 13; while (!feof(inFile) && !FLV::Parse_Error){ if (tmpTag.FileLoader(inFile)){ lastPack.null(); - lastPack = tmpTag.toJSON(myMeta); + lastPack = tmpTag.toJSON(myMeta, amf_storage); lastPack["bpos"] = lastBytePos; myMeta.update(lastPack); lastBytePos = ftell(inFile); @@ -89,12 +90,13 @@ namespace Mist { void inputFLV::getNext(bool smart) { static JSON::Value thisPack; + static AMF::Object amf_storage; thisPack.null(); long long int lastBytePos = ftell(inFile); FLV::Tag tmpTag; while (!feof(inFile) && !FLV::Parse_Error){ if (tmpTag.FileLoader(inFile)){ - thisPack = tmpTag.toJSON(myMeta); + thisPack = tmpTag.toJSON(myMeta, amf_storage); thisPack["bpos"] = lastBytePos; if ( !selectedTracks.count(thisPack["trackid"].asInt())){ getNext(); diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index c73e264c..7f71c675 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -792,60 +792,28 @@ namespace Mist { case 8: //audio data case 9: //video data case 18: {//meta data - pushData & p = pushes[next.cs_id]; + static std::map pushMeta; if (!isInitialized) { DEBUG_MSG(DLVL_MEDIUM, "Received useless media data\n"); myConn.close(); break; } F.ChunkLoader(next); - JSON::Value pack_out = F.toJSON(p.meta); + AMF::Object * amf_storage = 0; + if (F.data[0] == 0x12 || pushMeta.count(next.cs_id) || !pushMeta.size()){ + amf_storage = &(pushMeta[next.cs_id]); + }else{ + amf_storage = &(pushMeta.begin()->second); + } + JSON::Value pack_out = F.toJSON(myMeta, *amf_storage, next.cs_id*3 + (F.data[0] == 0x09 ? 0 : (F.data[0] == 0x08 ? 1 : 2) )); if ( !pack_out.isNull()){ - //Check for backwards timestamps - if (pack_out["time"].asInt() < p.meta.tracks[pack_out["trackid"].asInt()].lastms){ - ///Reset all internals - p.sending = false; - p.counter = 0; - p.preBuf.clear(); - p.meta = DTSC::Meta(); - pack_out = F.toJSON(p.meta);//Reinitialize the metadata with this packet. - ///Reset negotiation with buffer - userClient.finish(); - userClient = IPC::sharedClient(streamName + "_users", PLAY_EX_SIZE, true); - } - pack_out["trackid"] = pack_out["trackid"].asInt() + next.cs_id * 3; - if ( !p.sending){ - p.counter++; - if (p.counter > 8){ - p.sending = true; - if (myMeta.tracks.count(1)){ - myMeta = DTSC::Meta(); - } - for (unsigned int i = 1; i < 4; ++i){ - if (p.meta.tracks.count(i)){ - myMeta.tracks[next.cs_id*3+i] = p.meta.tracks[i]; - } - } - if (!userClient.getData()){ - char userPageName[NAME_BUFFER_SIZE]; - snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - userClient = IPC::sharedClient(userPageName, 30, true); - } - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - DEBUG_MSG(DLVL_MEDIUM, "Starting negotiation for track %d", it->first); - continueNegotiate(it->first); - } - for (std::deque::iterator it = p.preBuf.begin(); it != p.preBuf.end(); it++){ - bufferLivePacket((*it)); - } - p.preBuf.clear(); //clear buffer - bufferLivePacket(pack_out); - }else{ - p.preBuf.push_back(pack_out); - } - }else{ - bufferLivePacket(pack_out); + if (!userClient.getData()){ + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); + userClient = IPC::sharedClient(userPageName, 30, true); } + continueNegotiate(pack_out["trackid"].asInt()); + bufferLivePacket(pack_out); } break; } diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index 110d1984..5eacdae5 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -6,19 +6,6 @@ namespace Mist { - class pushData { - public: - DTSC::Meta meta; - bool sending; - int counter; - std::deque preBuf; - pushData(){ - sending = false; - counter = 0; - } - }; - - class OutRTMP : public Output { public: OutRTMP(Socket::Connection & conn); @@ -33,7 +20,6 @@ namespace Mist { void parseChunk(Socket::Buffer & inputBuffer); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId); void sendCommand(AMF::Object & amfReply, int messageType, int streamId); - std::map pushes; }; }