diff --git a/lib/flv_tag.cpp b/lib/flv_tag.cpp index 520929e5..70636616 100644 --- a/lib/flv_tag.cpp +++ b/lib/flv_tag.cpp @@ -518,7 +518,7 @@ void FLV::Tag::setLen() { data[ --i] = (len4) & 0xFF; } -/// FLV Video init data loader function from JSON. +/// FLV Video init data loader function from metadata. bool FLV::Tag::DTSCVideoInit(DTSC::Track & video) { //Unknown? Assume H264. len = 0; @@ -549,7 +549,7 @@ bool FLV::Tag::DTSCVideoInit(DTSC::Track & video) { return true; } -/// FLV Audio init data loader function from JSON. +/// FLV Audio init data loader function from metadata. bool FLV::Tag::DTSCAudioInit(DTSC::Track & audio) { len = 0; //Unknown? Assume AAC. @@ -709,6 +709,7 @@ bool FLV::Tag::ChunkLoader(const RTMPStream::Chunk & O) { data[2] = (O.len >> 8) & 0xFF; data[1] = (O.len >> 16) & 0xFF; tagTime(O.timestamp); + isKeyframe = ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)); return true; } @@ -794,11 +795,7 @@ bool FLV::Tag::MemLoader(char * D, unsigned int S, unsigned int & P) { //read tag body if (MemReadUntil(data, len, sofar, D, S, P)) { //calculate keyframeness, next time read header again, return true - if ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)) { - isKeyframe = true; - } else { - isKeyframe = false; - } + isKeyframe = ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)); done = true; sofar = 0; return true; @@ -892,11 +889,7 @@ bool FLV::Tag::FileLoader(FILE * f) { //read tag body if (FileReadUntil(data, len, sofar, f)) { //calculate keyframeness, next time read header again, return true - if ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)) { - isKeyframe = true; - } else { - isKeyframe = false; - } + isKeyframe = ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)); done = true; sofar = 0; fcntl(fileno(f), F_SETFL, preflags); @@ -944,9 +937,7 @@ unsigned int FLV::Tag::getDataLen(){ return len - 16; } -JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack) { - JSON::Value pack_out; // Storage for outgoing metadata. - +void FLV::Tag::toMeta(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack){ if (!reTrack){ switch (data[0]){ case 0x09: reTrack = 1; break;//video @@ -954,7 +945,6 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, u case 0x12: reTrack = 3; break;//meta } } - pack_out["trackid"] = reTrack; if (data[0] == 0x12) { AMF::Object meta_in = AMF::parse((unsigned char *)data + 11, len - 15); @@ -968,78 +958,56 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, u } if (tmp) { amf_storage = *tmp; - bool empty = true; - for (int i = 0; i < tmp->hasContent(); ++i) { - if (tmp->getContentP(i)->Indice() == "videocodecid" || tmp->getContentP(i)->Indice() == "audiocodecid" || tmp->getContentP(i)->Indice() == "width" || tmp->getContentP(i)->Indice() == "height" || tmp->getContentP(i)->Indice() == "videodatarate" || tmp->getContentP(i)->Indice() == "videoframerate" || tmp->getContentP(i)->Indice() == "audiodatarate" || tmp->getContentP(i)->Indice() == "audiosamplerate" || tmp->getContentP(i)->Indice() == "audiosamplesize" || tmp->getContentP(i)->Indice() == "audiochannels") { - continue; - } - if (tmp->getContentP(i)->NumValue()) { - pack_out["data"][tmp->getContentP(i)->Indice()] = (long long)tmp->getContentP(i)->NumValue(); - empty = false; - } else { - if (tmp->getContentP(i)->StrValue() != "") { - pack_out["data"][tmp->getContentP(i)->Indice()] = tmp->getContentP(i)->StrValue(); - empty = false; - } - } - } - if (!empty) { - pack_out["datatype"] = "meta"; - pack_out["time"] = tagTime(); - }else{ - pack_out.null(); - } } - return pack_out; //empty + return; } - if (data[0] == 0x08) { + if (data[0] == 0x08 && (metadata.tracks[reTrack].codec == "" || metadata.tracks[reTrack].codec != getAudioCodec() || (needsInitData() && isInitData()))) { char audiodata = data[11]; metadata.tracks[reTrack].trackID = reTrack; metadata.tracks[reTrack].type = "audio"; - if (metadata.tracks[reTrack].codec == "" || metadata.tracks[reTrack].codec != getAudioCodec()) { - metadata.tracks[reTrack].codec = getAudioCodec(); - switch (audiodata & 0x0C) { - case 0x0: - metadata.tracks[reTrack].rate = 5512; - break; - case 0x4: - metadata.tracks[reTrack].rate = 11025; - break; - case 0x8: - metadata.tracks[reTrack].rate = 22050; - break; - case 0xC: - metadata.tracks[reTrack].rate = 44100; - break; - } - if (amf_storage.getContentP("audiosamplerate")) { - metadata.tracks[reTrack].rate = (long long int)amf_storage.getContentP("audiosamplerate")->NumValue(); - } - switch (audiodata & 0x02) { - case 0x0: - metadata.tracks[reTrack].size = 8; - break; - case 0x2: - metadata.tracks[reTrack].size = 16; - break; - } - if (amf_storage.getContentP("audiosamplesize")) { - metadata.tracks[reTrack].size = (long long int)amf_storage.getContentP("audiosamplesize")->NumValue(); - } - switch (audiodata & 0x01) { - case 0x0: - metadata.tracks[reTrack].channels = 1; - break; - case 0x1: - metadata.tracks[reTrack].channels = 2; - break; - } - if (amf_storage.getContentP("stereo")) { - if (amf_storage.getContentP("stereo")->NumValue() == 1) { - metadata.tracks[reTrack].channels = 2; - } else { - metadata.tracks[reTrack].channels = 1; - } + metadata.tracks[reTrack].codec = getAudioCodec(); + + switch (audiodata & 0x0C) { + case 0x0: + metadata.tracks[reTrack].rate = 5512; + break; + case 0x4: + metadata.tracks[reTrack].rate = 11025; + break; + case 0x8: + metadata.tracks[reTrack].rate = 22050; + break; + case 0xC: + metadata.tracks[reTrack].rate = 44100; + break; + } + if (amf_storage.getContentP("audiosamplerate")) { + metadata.tracks[reTrack].rate = (long long int)amf_storage.getContentP("audiosamplerate")->NumValue(); + } + switch (audiodata & 0x02) { + case 0x0: + metadata.tracks[reTrack].size = 8; + break; + case 0x2: + metadata.tracks[reTrack].size = 16; + break; + } + if (amf_storage.getContentP("audiosamplesize")) { + metadata.tracks[reTrack].size = (long long int)amf_storage.getContentP("audiosamplesize")->NumValue(); + } + switch (audiodata & 0x01) { + case 0x0: + metadata.tracks[reTrack].channels = 1; + break; + case 0x1: + metadata.tracks[reTrack].channels = 2; + break; + } + if (amf_storage.getContentP("stereo")) { + if (amf_storage.getContentP("stereo")->NumValue() == 1) { + metadata.tracks[reTrack].channels = 2; + } else { + metadata.tracks[reTrack].channels = 1; } } if (needsInitData() && isInitData()) { @@ -1048,54 +1016,36 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, u } else { metadata.tracks[reTrack].init = std::string((char *)data + 12, (size_t)len - 16); } - pack_out.null(); - return pack_out; //skip rest of parsing, get next tag. } - pack_out["time"] = tagTime(); - if ((audiodata & 0xF0) == 0xA0) { - if (len < 18) { - return JSON::Value(); - } - pack_out["data"] = std::string((char *)data + 13, (size_t)len - 17); - } else { - if (len < 17) { - return JSON::Value(); - } - pack_out["data"] = std::string((char *)data + 12, (size_t)len - 16); - } - return pack_out; } - if (data[0] == 0x09) { + + if (data[0] == 0x09 && ((needsInitData() && isInitData()) || !metadata.tracks[reTrack].codec.size())){ char videodata = data[11]; - if (metadata.tracks[reTrack].codec == "") { - metadata.tracks[reTrack].codec = getVideoCodec(); - } + metadata.tracks[reTrack].codec = getVideoCodec(); metadata.tracks[reTrack].type = "video"; metadata.tracks[reTrack].trackID = reTrack; - if (!metadata.tracks[reTrack].width || !metadata.tracks[reTrack].height){ - if (amf_storage.getContentP("width")) { - metadata.tracks[reTrack].width = (long long int)amf_storage.getContentP("width")->NumValue(); - } - if (amf_storage.getContentP("height")) { - metadata.tracks[reTrack].height = (long long int)amf_storage.getContentP("height")->NumValue(); - } + if (amf_storage.getContentP("width")) { + metadata.tracks[reTrack].width = (long long int)amf_storage.getContentP("width")->NumValue(); + } + if (amf_storage.getContentP("height")) { + metadata.tracks[reTrack].height = (long long int)amf_storage.getContentP("height")->NumValue(); } if (!metadata.tracks[reTrack].fpks && amf_storage.getContentP("videoframerate")) { if (amf_storage.getContentP("videoframerate")->NumValue()){ metadata.tracks[reTrack].fpks = (long long int)(amf_storage.getContentP("videoframerate")->NumValue() * 1000.0); }else{ - metadata.tracks[reTrack].fpks = JSON::Value(amf_storage.getContentP("videoframerate")->StrValue()).asInt() * 1000.0; + metadata.tracks[reTrack].fpks = atoi(amf_storage.getContentP("videoframerate")->StrValue().c_str()) * 1000.0; } } if (needsInitData() && isInitData()) { if ((videodata & 0x0F) == 7) { if (len < 21) { - return JSON::Value(); + return; } metadata.tracks[reTrack].init = std::string((char *)data + 16, (size_t)len - 20); } else { if (len < 17) { - return JSON::Value(); + return; } metadata.tracks[reTrack].init = std::string((char *)data + 12, (size_t)len - 16); } @@ -1108,48 +1058,9 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, u metadata.tracks[reTrack].height = spsChar.height; metadata.tracks[reTrack].fpks = spsChar.fps * 1000; } - pack_out.null(); - return pack_out; //skip rest of parsing, get next tag. } - switch (videodata & 0xF0) { - case 0x10: - case 0x40: - pack_out["keyframe"] = 1; - break; - case 0x50: - return JSON::Value(); - break; //the video info byte we just throw away - useless to us... - } - pack_out["time"] = tagTime(); - if (!getDataLen()){ - //empty packet - pack_out["data"] = ""; - return pack_out; - } - if ((videodata & 0x0F) == 7) { - switch (data[12]) { - case 1: - pack_out["nalu"] = 1; - break; - case 2: - pack_out["nalu_end"] = 1; - break; - } - pack_out["offset"] = offset(); - if (len < 21) { - return JSON::Value(); - } - pack_out["data"] = std::string((char *)data + 16, (size_t)len - 20); - } else { - if (len < 17) { - return JSON::Value(); - } - pack_out["data"] = std::string((char *)data + 12, (size_t)len - 16); - } - return pack_out; } - return pack_out; //should never get here -} //FLV::Tag::toJSON +} /// Checks if buf is large enough to contain len. /// Attempts to resize data buffer if not/ diff --git a/lib/flv_tag.h b/lib/flv_tag.h index b91fe53e..4e430830 100644 --- a/lib/flv_tag.h +++ b/lib/flv_tag.h @@ -4,7 +4,6 @@ #pragma once #include "socket.h" #include "dtsc.h" -#include "json.h" #include "amf.h" #include @@ -51,7 +50,7 @@ namespace FLV { bool DTSCVideoInit(DTSC::Track & video); bool DTSCAudioInit(DTSC::Track & audio); bool DTSCMetaInit(DTSC::Meta & M, std::set & selTracks); - JSON::Value toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack = 0); + void toMeta(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack = 0); bool MemLoader(char * D, unsigned int S, unsigned int & P); bool FileLoader(FILE * f); unsigned int getTrackID(); @@ -66,10 +65,6 @@ namespace FLV { //loader helper functions bool MemReadUntil(char * buffer, unsigned int count, unsigned int & sofar, char * D, unsigned int S, unsigned int & P); bool FileReadUntil(char * buffer, unsigned int count, unsigned int & sofar, FILE * f); - //JSON writer helpers - void Meta_Put(JSON::Value & meta, std::string cat, std::string elem, std::string val); - void Meta_Put(JSON::Value & meta, std::string cat, std::string elem, uint64_t val); - bool Meta_Has(JSON::Value & meta, std::string cat, std::string elem); }; //Tag diff --git a/lib/util.cpp b/lib/util.cpp index 2c44cf47..0e1d39fb 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -1,4 +1,8 @@ +//This line will make ftello/fseeko work with 64 bits numbers +#define _FILE_OFFSET_BITS 64 + #include "util.h" +#include #include namespace Util { @@ -36,5 +40,19 @@ namespace Util { } return result.size() == positions.size(); } + + /// 64-bits version of ftell + uint64_t ftell(FILE * stream){ + /// \TODO Windows implementation (e.g. _ftelli64 ?) + return ftello(stream); + } + + /// 64-bits version of fseek + uint64_t fseek(FILE * stream, uint64_t offset, int whence){ + /// \TODO Windows implementation (e.g. _fseeki64 ?) + return fseeko(stream, offset, whence); + } + + } diff --git a/lib/util.h b/lib/util.h index df0a27b9..2bb6cd9a 100644 --- a/lib/util.h +++ b/lib/util.h @@ -3,4 +3,6 @@ namespace Util { bool stringScan(const std::string & src, const std::string & pattern, std::deque & result); + uint64_t ftell(FILE * stream); + uint64_t fseek(FILE * stream, uint64_t offset, int whence); } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 16795a58..574198e1 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -624,11 +624,11 @@ namespace Mist { } void inputBuffer::updateTrackMeta(unsigned long tNum) { - VERYHIGH_MSG("Updating meta for track %d", tNum); //Store a reference for easier access std::map & locations = bufferLocations[tNum]; char * mappedPointer = nProxy.metaPages[tNum].mapped; if (!mappedPointer){return;} + VERYHIGH_MSG("Updating meta for track %lu, %lu pages", tNum, locations.size()); //First detect all entries on metaPage for (int i = 0; i < 8192; i += 8) { @@ -637,11 +637,11 @@ namespace Mist { continue; } unsigned long keyNum = ntohl(tmpOffset[0]); - INSANE_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1])); //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. if (!locations.count(keyNum)) { locations[keyNum].curOffset = 0; + VERYHIGH_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1])); } locations[keyNum].pageNum = keyNum; locations[keyNum].keyNum = ntohl(tmpOffset[1]); diff --git a/src/input/input_flv.cpp b/src/input/input_flv.cpp index 53412c9c..1065f6ef 100644 --- a/src/input/input_flv.cpp +++ b/src/input/input_flv.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -53,33 +54,35 @@ namespace Mist { //See whether a separate header file exists. if (readExistingHeader()){return true;} //Create header file from FLV data - fseek(inFile, 13, SEEK_SET); + Util::fseek(inFile, 13, SEEK_SET); AMF::Object amf_storage; - JSON::Value lastPack; long long int lastBytePos = 13; + uint64_t bench = Util::getMicros(); while (!feof(inFile) && !FLV::Parse_Error){ if (tmpTag.FileLoader(inFile)){ - lastPack.null(); - lastPack = tmpTag.toJSON(myMeta, amf_storage); - lastPack["bpos"] = lastBytePos; - myMeta.update(lastPack); - lastBytePos = ftell(inFile); + tmpTag.toMeta(myMeta, amf_storage); + if (!tmpTag.getDataLen()){continue;} + if (tmpTag.needsInitData() && tmpTag.isInitData()){continue;} + myMeta.update(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); + lastBytePos = Util::ftell(inFile); } } + bench = Util::getMicros(bench); + INFO_MSG("Header generated in %llu ms: @%lld, %s, %s", bench/1000, lastBytePos, myMeta.vod?"VoD":"NOVoD", myMeta.live?"Live":"NOLive"); if (FLV::Parse_Error){ - std::cerr << FLV::Error_Str << std::endl; - return false; + FLV::Parse_Error = false; + ERROR_MSG("Stopping at FLV parse error @%lld: %s", lastBytePos, FLV::Error_Str.c_str()); } myMeta.toFile(config->getString("input") + ".dtsh"); return true; } void inputFLV::getNext(bool smart) { - long long int lastBytePos = ftell(inFile); + long long int lastBytePos = Util::ftell(inFile); while (!feof(inFile) && !FLV::Parse_Error){ if (tmpTag.FileLoader(inFile)){ if ( !selectedTracks.count(tmpTag.getTrackID())){ - lastBytePos = ftell(inFile); + lastBytePos = Util::ftell(inFile); continue; } break; @@ -90,11 +93,12 @@ namespace Mist { return; } if (FLV::Parse_Error){ - FAIL_MSG("FLV error: %s", FLV::Error_Str.c_str()); + FLV::Parse_Error = false; + FAIL_MSG("FLV error @ %lld: %s", lastBytePos, FLV::Error_Str.c_str()); thisPacket.null(); return; } - if (!tmpTag.getDataLen()){ + if (!tmpTag.getDataLen() || (tmpTag.needsInitData() && tmpTag.isInitData())){ return getNext(); } thisPacket.genericFill(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getData(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); //init packet from tmpTags data @@ -104,14 +108,14 @@ namespace Mist { //We will seek to the corresponding keyframe of the video track if selected, otherwise audio keyframe. //Flv files are never multi-track, so track 1 is video, track 2 is audio. int trackSeek = (selectedTracks.count(1) ? 1 : 2); - size_t seekPos = myMeta.tracks[trackSeek].keys[0].getBpos(); + uint64_t seekPos = myMeta.tracks[trackSeek].keys[0].getBpos(); for (unsigned int i = 0; i < myMeta.tracks[trackSeek].keys.size(); i++){ if (myMeta.tracks[trackSeek].keys[i].getTime() > seekTime){ break; } seekPos = myMeta.tracks[trackSeek].keys[i].getBpos(); } - fseek(inFile, seekPos, SEEK_SET); + Util::fseek(inFile, seekPos, SEEK_SET); } void inputFLV::trackSelect(std::string trackSpec) { diff --git a/src/io.cpp b/src/io.cpp index a38f0d6c..3530d2c6 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -417,16 +417,14 @@ namespace Mist { unsigned long tid = packet.getTrackId(); //Do nothing if the trackid is invalid if (!tid) { - INFO_MSG("Packet without trackid"); + WARN_MSG("Packet without trackid!"); return; } - //If the track is not negotiated yet, start the negotiation - if (!trackState.count(tid)) { - continueNegotiate(tid, myMeta); - } + //negotiate track ID if needed + continueNegotiate(tid, myMeta); //If the track is declined, stop here if (trackState[tid] == FILL_DEC) { - INFO_MSG("Track %lu Declined", tid); + INFO_MSG("Track %lu declined", tid); preBuffer[tid].clear(); return; } @@ -434,9 +432,12 @@ namespace Mist { if (trackState[tid] != FILL_ACC) { preBuffer[tid].push_back(packet); }else{ - while (preBuffer[tid].size()){ - bufferSinglePacket(preBuffer[tid].front(), myMeta); - preBuffer[tid].pop_front(); + if (preBuffer[tid].size()){ + INFO_MSG("Track %lu accepted", tid); + while (preBuffer[tid].size()){ + bufferSinglePacket(preBuffer[tid].front(), myMeta); + preBuffer[tid].pop_front(); + } } bufferSinglePacket(packet, myMeta); } @@ -448,9 +449,7 @@ namespace Mist { //This update needs to happen whether the track is accepted or not. bool isKeyframe = false; if (myMeta.tracks[tid].type == "video") { - if (packet.hasMember("keyframe") && packet.getFlag("keyframe")) { - isKeyframe = true; - } + isKeyframe = packet.getFlag("keyframe"); } else { if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0) { //Assume this is the first packet on the track @@ -467,6 +466,7 @@ namespace Mist { //This also happens in bufferNext, with the same rules if (myMeta.live){ if (packet.getTime() > 0xFFFF0000 && !myMeta.tracks[tid].lastms){ + INFO_MSG("Ignoring packet with unexpected timestamp"); return;//ignore bullshit timestamps } if (packet.getTime() < myMeta.tracks[tid].lastms){ @@ -509,6 +509,7 @@ namespace Mist { } //If we have no pages by track, we have not received a starting keyframe yet. Drop this packet. if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0){ + INFO_MSG("Track %lu not starting with a keyframe!", tid); return; } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 73dce935..a69fc9a1 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -936,16 +936,18 @@ namespace Mist { }else{ amf_storage = &(pushMeta.begin()->second); } - JSON::Value pack_out = F.toJSON(myMeta, *amf_storage, next.cs_id*3 + (F.data[0] == 0x09 ? 0 : (F.data[0] == 0x08 ? 1 : 2) )); - if ( !pack_out.isNull()){ + + unsigned int reTrack = next.cs_id*3 + (F.data[0] == 0x09 ? 1 : (F.data[0] == 0x08 ? 2 : 3)); + F.toMeta(myMeta, *amf_storage, reTrack); + if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ + thisPacket.genericFill(F.tagTime(), F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); if (!nProxy.userClient.getData()){ char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); } - continueNegotiate(pack_out["trackid"].asInt()); nProxy.streamName = streamName; - bufferLivePacket(pack_out); + bufferLivePacket(thisPacket); } break; }