Improved performance of FLV input and generic input IO class.

This commit is contained in:
Thulinma 2015-10-06 16:36:06 +02:00
parent 16d38459b6
commit 6d25774b94
3 changed files with 23 additions and 20 deletions

View file

@ -6,7 +6,6 @@
#include <cstdio> #include <cstdio>
#include <string> #include <string>
#include <mist/stream.h> #include <mist/stream.h>
#include <mist/flv_tag.h>
#include <mist/defines.h> #include <mist/defines.h>
#include "input_flv.h" #include "input_flv.h"
@ -66,7 +65,6 @@ namespace Mist {
} }
//Create header file from FLV data //Create header file from FLV data
fseek(inFile, 13, SEEK_SET); fseek(inFile, 13, SEEK_SET);
FLV::Tag tmpTag;
AMF::Object amf_storage; AMF::Object amf_storage;
long long int lastBytePos = 13; long long int lastBytePos = 13;
while (!feof(inFile) && !FLV::Parse_Error){ while (!feof(inFile) && !FLV::Parse_Error){
@ -90,7 +88,6 @@ namespace Mist {
void inputFLV::getNext(bool smart) { void inputFLV::getNext(bool smart) {
long long int lastBytePos = ftell(inFile); long long int lastBytePos = ftell(inFile);
FLV::Tag tmpTag;
while (!feof(inFile) && !FLV::Parse_Error){ while (!feof(inFile) && !FLV::Parse_Error){
if (tmpTag.FileLoader(inFile)){ if (tmpTag.FileLoader(inFile)){
if ( !selectedTracks.count(tmpTag.getTrackID())){ if ( !selectedTracks.count(tmpTag.getTrackID())){

View file

@ -1,5 +1,6 @@
#include "input.h" #include "input.h"
#include <mist/dtsc.h> #include <mist/dtsc.h>
#include <mist/flv_tag.h>
namespace Mist { namespace Mist {
class inputFLV : public Input { class inputFLV : public Input {
@ -12,7 +13,7 @@ namespace Mist {
void getNext(bool smart = true); void getNext(bool smart = true);
void seek(int seekTime); void seek(int seekTime);
void trackSelect(std::string trackSpec); void trackSelect(std::string trackSpec);
FLV::Tag tmpTag;
FILE * inFile; FILE * inFile;
}; };
} }

View file

@ -214,11 +214,13 @@ namespace Mist {
INFO_MSG("Trying to buffer a packet on track %lu~>%lu, but no page is initialized", tid, mapTid); INFO_MSG("Trying to buffer a packet on track %lu~>%lu, but no page is initialized", tid, mapTid);
return; return;
} }
IPC::sharedPage & myPage = curPage[tid];
DTSCPageData & pageData = pagesByTrack[tid][curPageNum[tid]];
//Save the current write position //Save the current write position
size_t curOffset = pagesByTrack[tid][curPageNum[tid]].curOffset; size_t curOffset = pageData.curOffset;
//Do nothing when there is not enough free space on the page to add the packet. //Do nothing when there is not enough free space on the page to add the packet.
if (pagesByTrack[tid][curPageNum[tid]].dataSize - curOffset < pack.getDataLen()) { if (pageData.dataSize - curOffset < pack.getDataLen()) {
FAIL_MSG("Trying to buffer a packet on page %lu for track %lu~>%lu, but we have a size mismatch. The packet is %lu bytes long, so won't fit at offset %lu on a page of %lu bytes!", curPageNum[tid], tid, mapTid, pack.getDataLen(), curOffset, pagesByTrack[tid][curPageNum[tid]].dataSize); FAIL_MSG("Trying to buffer a packet on page %lu for track %lu~>%lu, but we have a size mismatch. The packet is %d bytes long, so won't fit at offset %lu on a page of %llu bytes!", curPageNum[tid], tid, mapTid, pack.getDataLen(), curOffset, pageData.dataSize);
return; return;
} }
@ -226,24 +228,27 @@ namespace Mist {
//First memcpy only the payload to the destination //First memcpy only the payload to the destination
//Leaves the 20 bytes inbetween empty to ensure the data is not accidentally read before it is complete //Leaves the 20 bytes inbetween empty to ensure the data is not accidentally read before it is complete
memcpy(curPage[tid].mapped + curOffset + 20, pack.getData() + 20, pack.getDataLen() - 20); memcpy(myPage.mapped + curOffset + 20, pack.getData() + 20, pack.getDataLen() - 20);
//Copy the remaing values in reverse order: //Copy the remaing values in reverse order:
//8 byte timestamp //8 byte timestamp
memcpy(curPage[tid].mapped + curOffset + 12, pack.getData() + 12, 8); memcpy(myPage.mapped + curOffset + 12, pack.getData() + 12, 8);
//The mapped track id //The mapped track id
((int *)(curPage[tid].mapped + curOffset + 8))[0] = htonl(mapTid); ((int *)(myPage.mapped + curOffset + 8))[0] = htonl(mapTid);
//Write the size and 'DTP2' bytes to conclude the packet and allow for reading it int size = Bit::btohl(pack.getData() + 4);
memcpy(curPage[tid].mapped + curOffset, pack.getData(), 8); //Write the size
Bit::htobl(myPage.mapped + curOffset + 4, size);
//write the 'DTP2' bytes to conclude the packet and allow for reading it
memcpy(myPage.mapped + curOffset, pack.getData(), 4);
if (myMeta.live){ if (myMeta.live){
//Update the metadata //Update the metadata
DTSC::Packet updatePack(curPage[tid].mapped + curOffset, pack.getDataLen(), true); DTSC::Packet updatePack(myPage.mapped + curOffset, size + 8, true);
myMeta.update(updatePack); myMeta.update(updatePack);
} }
//End of brain melt //End of brain melt
pagesByTrack[tid][curPageNum[tid]].curOffset += pack.getDataLen(); pageData.curOffset += size + 8;
} }
///Wraps up the buffering of a shared memory data page ///Wraps up the buffering of a shared memory data page
@ -320,7 +325,7 @@ namespace Mist {
///\param packet The packet to buffer ///\param packet The packet to buffer
void InOutBase::bufferLivePacket(JSON::Value & packet) { void InOutBase::bufferLivePacket(JSON::Value & packet) {
//Store the trackid for easier access //Store the trackid for easier access
unsigned long tid = packet["trackid"].asInt(); unsigned long tid = packet.getTrackId();
//Do nothing if the trackid is invalid //Do nothing if the trackid is invalid
if (!tid) { if (!tid) {
INFO_MSG("Packet without trackid"); INFO_MSG("Packet without trackid");
@ -357,7 +362,7 @@ namespace Mist {
///\todo Figure out how to act with declined track here ///\todo Figure out how to act with declined track here
bool isKeyframe = false; bool isKeyframe = false;
if (myMeta.tracks[tid].type == "video") { if (myMeta.tracks[tid].type == "video") {
if (packet.isMember("keyframe") && packet["keyframe"]) { if (packet.hasMember("keyframe") && packet.getFlag("keyframe")) {
isKeyframe = true; isKeyframe = true;
} }
} else { } else {
@ -366,7 +371,7 @@ namespace Mist {
isKeyframe = true; isKeyframe = true;
} else { } else {
unsigned long lastKey = pagesByTrack[tid].rbegin()->second.lastKeyTime; unsigned long lastKey = pagesByTrack[tid].rbegin()->second.lastKeyTime;
if (packet["time"].asInt() - lastKey > 5000) { if (packet.getTime() - lastKey > 5000) {
isKeyframe = true; isKeyframe = true;
} }
} }
@ -390,7 +395,7 @@ namespace Mist {
pagesByTrack[tid][nextPageNum].dataSize = (25 * 1024 * 1024); pagesByTrack[tid][nextPageNum].dataSize = (25 * 1024 * 1024);
pagesByTrack[tid][nextPageNum].pageNum = nextPageNum; pagesByTrack[tid][nextPageNum].pageNum = nextPageNum;
} }
pagesByTrack[tid].rbegin()->second.lastKeyTime = packet["time"].asInt(); pagesByTrack[tid].rbegin()->second.lastKeyTime = packet.getTime();
pagesByTrack[tid].rbegin()->second.keyNum++; pagesByTrack[tid].rbegin()->second.keyNum++;
} }
//Set the pageNumber if it has not been set yet //Set the pageNumber if it has not been set yet
@ -496,8 +501,8 @@ namespace Mist {
metaPages[tid].init(pageName, 8 * 1024 * 1024, true); metaPages[tid].init(pageName, 8 * 1024 * 1024, true);
metaPages[tid].master = false; metaPages[tid].master = false;
DTSC::Meta tmpMeta; DTSC::Meta tmpMeta;
tmpMeta.tracks[tid] = myMeta.tracks[tid]; tmpMeta.tracks[newTid] = myMeta.tracks[tid];
tmpMeta.tracks[tid].trackID = newTid; tmpMeta.tracks[newTid].trackID = newTid;
JSON::Value tmpVal = tmpMeta.toJSON(); JSON::Value tmpVal = tmpMeta.toJSON();
std::string tmpStr = tmpVal.toNetPacked(); std::string tmpStr = tmpVal.toNetPacked();
memcpy(metaPages[tid].mapped, tmpStr.data(), tmpStr.size()); memcpy(metaPages[tid].mapped, tmpStr.data(), tmpStr.size());