Fix for TS Input CPU Usage

This commit is contained in:
Erik Zandvliet 2015-10-05 17:08:10 +02:00
parent 8567e5192a
commit 01472ca88d
2 changed files with 40 additions and 34 deletions

View file

@ -143,9 +143,9 @@ namespace Mist {
//Live inputs only have a serve() mode //Live inputs only have a serve() mode
#ifndef INPUT_LIVE #ifndef INPUT_LIVE
if (!config->getString("streamname").size()){ if (!config->getString("streamname").size()) {
convert(); convert();
}else{ } else {
#endif #endif
serve(); serve();
#ifndef INPUT_LIVE #ifndef INPUT_LIVE
@ -154,11 +154,11 @@ namespace Mist {
return 0; return 0;
} }
void Input::convert(){ void Input::convert() {
//check filename for no - //check filename for no -
if (config->getString("output") != "-"){ if (config->getString("output") != "-") {
std::string filename = config->getString("output"); std::string filename = config->getString("output");
if (filename.size() < 5 || filename.substr(filename.size() - 5) != ".dtsc"){ if (filename.size() < 5 || filename.substr(filename.size() - 5) != ".dtsc") {
filename += ".dtsc"; filename += ".dtsc";
} }
//output to dtsc //output to dtsc
@ -168,7 +168,7 @@ namespace Mist {
long long int bpos = 0; long long int bpos = 0;
seek(0); seek(0);
getNext(); getNext();
while (thisPacket){ while (thisPacket) {
newMeta.updatePosOverride(thisPacket, bpos); newMeta.updatePosOverride(thisPacket, bpos);
file.write(thisPacket.getData(), thisPacket.getDataLen()); file.write(thisPacket.getData(), thisPacket.getDataLen());
bpos += thisPacket.getDataLen(); bpos += thisPacket.getDataLen();
@ -177,26 +177,26 @@ namespace Mist {
//close file //close file
file.close(); file.close();
//create header //create header
file.open((filename+".dtsh").c_str()); file.open((filename + ".dtsh").c_str());
file << newMeta.toJSON().toNetPacked(); file << newMeta.toJSON().toNetPacked();
file.close(); file.close();
}else{ } else {
DEBUG_MSG(DLVL_FAIL,"No filename specified, exiting"); DEBUG_MSG(DLVL_FAIL, "No filename specified, exiting");
} }
} }
void Input::serve(){ void Input::serve() {
char userPageName[NAME_BUFFER_SIZE]; char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
#ifdef INPUT_LIVE #ifdef INPUT_LIVE
Util::startInput(streamName); Util::startInput(streamName);
userClient = IPC::sharedClient(userPageName, 30, true); userClient = IPC::sharedClient(userPageName, 30, true);
getNext(); getNext();
while (thisPacket || config->is_active){ while (thisPacket || config->is_active) {
unsigned long tid = thisPacket.getTrackId(); unsigned long tid = thisPacket.getTrackId();
//Check for eligibility of track //Check for eligibility of track
IPC::userConnection userConn(userClient.getData()); IPC::userConnection userConn(userClient.getData());
if (trackOffset.count(tid) && !userConn.getTrackId(trackOffset[tid])){ if (trackOffset.count(tid) && !userConn.getTrackId(trackOffset[tid])) {
trackOffset.erase(tid); trackOffset.erase(tid);
trackState.erase(tid); trackState.erase(tid);
trackMap.erase(tid); trackMap.erase(tid);
@ -208,9 +208,11 @@ namespace Mist {
INFO_MSG("Erasing track %d", tid); INFO_MSG("Erasing track %d", tid);
continue; continue;
} }
if (thisPacket){ if (thisPacket) {
continueNegotiate(thisPacket.getTrackId()); continueNegotiate(thisPacket.getTrackId());
bufferLivePacket(thisPacket); bufferLivePacket(thisPacket);
} else {
Util::sleep(100);
} }
getNext(); getNext();
userClient.keepAlive(); userClient.keepAlive();
@ -218,29 +220,29 @@ namespace Mist {
userClient.finish(); userClient.finish();
#else #else
userPage.init(userPageName, PLAY_EX_SIZE, true); userPage.init(userPageName, PLAY_EX_SIZE, true);
if (!isBuffer){ if (!isBuffer) {
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
bufferFrame(it->first, 1); bufferFrame(it->first, 1);
} }
} }
DEBUG_MSG(DLVL_DEVEL,"Input for stream %s started", streamName.c_str()); DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str());
long long int activityCounter = Util::bootSecs(); long long int activityCounter = Util::bootSecs();
while ((Util::bootSecs() - activityCounter) < 10 && config->is_active){//10 second timeout while ((Util::bootSecs() - activityCounter) < 10 && config->is_active) { //10 second timeout
Util::wait(1000); Util::wait(1000);
userPage.parseEach(callbackWrapper); userPage.parseEach(callbackWrapper);
removeUnused(); removeUnused();
if (userPage.amount){ if (userPage.amount) {
activityCounter = Util::bootSecs(); activityCounter = Util::bootSecs();
DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount); DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount);
}else{ } else {
DEBUG_MSG(DLVL_INSANE, "Timer running"); DEBUG_MSG(DLVL_INSANE, "Timer running");
} }
} }
#endif #endif
finish(); finish();
DEBUG_MSG(DLVL_DEVEL,"Input for stream %s closing clean", streamName.c_str()); DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str());
//end player functionality //end player functionality
} }
@ -386,13 +388,13 @@ namespace Mist {
} }
} }
} }
bool Input::bufferFrame(unsigned int track, unsigned int keyNum){ bool Input::bufferFrame(unsigned int track, unsigned int keyNum) {
VERYHIGH_MSG("bufferFrame for stream %s, track %u, key %u", streamName.c_str(), track, keyNum); VERYHIGH_MSG("bufferFrame for stream %s, track %u, key %u", streamName.c_str(), track, keyNum);
if (keyNum >= myMeta.tracks[track].keys.size()){ if (keyNum >= myMeta.tracks[track].keys.size()) {
//End of movie here, returning true to avoid various error messages //End of movie here, returning true to avoid various error messages
VERYHIGH_MSG("Key number is higher than total key count. Cancelling bufferFrame"); VERYHIGH_MSG("Key number is higher than total key count. Cancelling bufferFrame");
return true; return true;
} }
if (keyNum < 1) { if (keyNum < 1) {
@ -409,18 +411,18 @@ namespace Mist {
} }
} }
pageCounter[track][pageNumber] = 15; pageCounter[track][pageNumber] = 15;
VERYHIGH_MSG("Track %u, key %u is already buffered in page %d. Cancelling bufferFrame", track, keyNum, pageNumber); VERYHIGH_MSG("Track %u, key %u is already buffered in page %d. Cancelling bufferFrame", track, keyNum, pageNumber);
return true; return true;
} }
if (!pagesByTrack.count(track)){ if (!pagesByTrack.count(track)) {
WARN_MSG("No pages for track %u found! Cancelling bufferFrame", track); WARN_MSG("No pages for track %u found! Cancelling bufferFrame", track);
return false; return false;
} }
//Update keynum to point to the corresponding page //Update keynum to point to the corresponding page
INFO_MSG("Loading key %u from page %lu", keyNum, (--(pagesByTrack[track].upper_bound(keyNum)))->first); INFO_MSG("Loading key %u from page %lu", keyNum, (--(pagesByTrack[track].upper_bound(keyNum)))->first);
keyNum = (--(pagesByTrack[track].upper_bound(keyNum)))->first; keyNum = (--(pagesByTrack[track].upper_bound(keyNum)))->first;
if (!bufferStart(track, keyNum)){ if (!bufferStart(track, keyNum)) {
WARN_MSG("bufferStart failed! Cancelling bufferFrame"); WARN_MSG("bufferStart failed! Cancelling bufferFrame");
return false; return false;
} }

View file

@ -177,6 +177,10 @@ namespace Mist {
return; return;
} }
unsigned long mapTid = trackMap[tid]; unsigned long mapTid = trackMap[tid];
if (!pagesByTrack.count(tid)){
//The buffer does not control the datapages, indicated by no pagesByTrack entry.
return;
}
//If the given pagenumber is not a valid page on this track, do nothing //If the given pagenumber is not a valid page on this track, do nothing
if (!pagesByTrack[tid].count(pageNumber)){ if (!pagesByTrack[tid].count(pageNumber)){
INFO_MSG("Can't remove page %lu on track %lu~>%lu as it is not a valid page number.", pageNumber, tid, mapTid); INFO_MSG("Can't remove page %lu on track %lu~>%lu as it is not a valid page number.", pageNumber, tid, mapTid);
@ -192,7 +196,7 @@ namespace Mist {
#else #else
toErase.init(pageName, pagesByTrack[tid][pageNumber].dataSize, false); toErase.init(pageName, pagesByTrack[tid][pageNumber].dataSize, false);
#endif #endif
//Set the master flag so that the page will be destoryed once it leaves scope //Set the master flag so that the page will be destroyed once it leaves scope
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
IPC::releasePage(pageName); IPC::releasePage(pageName);
#endif #endif