Backported various fixes by Erik Zandvliet, also added better handling of re-pushes.

This commit is contained in:
Thulinma 2015-05-21 20:15:09 +02:00
parent 17aa6bbbb6
commit 6b2a158d9c
3 changed files with 55 additions and 11 deletions

View file

@ -141,15 +141,32 @@ namespace Mist {
return true;
}
void inputBuffer::eraseTrackDataPages(unsigned long tid){
if (!bufferLocations.count(tid)){
return;
}
for (std::map<unsigned long, DTSCPageData>::iterator it = bufferLocations[tid].begin(); it != bufferLocations[tid].end(); it++){
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tid, it->first);
curPage[tid].init(thisPageName, 20971520, false, false);
curPage[tid].master = true;
curPage.erase(tid);
}
bufferLocations.erase(tid);
metaPages[tid].master = true;
metaPages.erase(tid);
}
void inputBuffer::finish() {
Input::finish();
updateMeta();
if (bufferLocations.size()){
std::set<unsigned long> toErase;
for (std::map<unsigned long, std::map<unsigned long, DTSCPageData> >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++){
for (std::map<unsigned long, DTSCPageData>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) {
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), it->first, it2->first);
curPage[it->first].init(thisPageName, 20971520, false, false);
curPage[it->first].master = true;
curPage.erase(it->first);
toErase.insert(it->first);
}
for (std::set<unsigned long>::iterator it = toErase.begin(); it != toErase.end(); ++it){
eraseTrackDataPages(*it);
}
}
}
@ -262,9 +279,13 @@ namespace Mist {
metaPages.erase(value);
}
if (activeTracks.count(value)) {
updateMeta();
eraseTrackDataPages(value);
activeTracks.erase(value);
bufferLocations.erase(value);
}
metaPages[value].master = true;
metaPages.erase(value);
continue;
}
}
@ -298,7 +319,11 @@ namespace Mist {
}
//If this tracks metdata page is not initialize, skip the entire element for now. It will be instantiated later
if (!metaPages[value].mapped) {
///\todo Maybe add a timeout counter here, for when we dont expect the track to appear anymore
//remove the negotiation if it has timed out
if (++negotiationTimeout[value] >= 1000){
negotiatingTracks.erase(value);
negotiationTimeout.erase(value);
}
continue;
}
@ -316,6 +341,14 @@ namespace Mist {
DTSC::Meta trackMeta(tempJSONForMeta);
//If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call.
if (!trackMeta.tracks.count(value)) {
//remove the negotiation if it has timed out
if (++negotiationTimeout[value] >= 1000){
negotiatingTracks.erase(value);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
metaPages[value].master = true;
metaPages.erase(value);
negotiationTimeout.erase(value);
}
continue;
}
@ -335,15 +368,17 @@ namespace Mist {
//or if the firstms of the replacement track is later than the lastms on the existing track
if (!myMeta.tracks.count(finalMap) || trackMeta.tracks.find(value)->second.keys.size() > 1 || trackMeta.tracks.find(value)->second.firstms >= myMeta.tracks[finalMap].lastms) {
if (myMeta.tracks.count(finalMap) && myMeta.tracks[finalMap].lastms > 0) {
INFO_MSG("Resume of track %d detected, coming from temporary track %lu of user %u", finalMap, value, id);
INFO_MSG("Resume of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id);
} else {
INFO_MSG("New track detected, assigned track id %d, coming from temporary track %lu of user %u", finalMap, value, id);
INFO_MSG("New track detected, assigned track id %lu, coming from temporary track %lu of user %u", finalMap, value, id);
}
} else {
//Otherwise replace existing track
INFO_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id);
myMeta.tracks.erase(finalMap);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
updateMeta();
eraseTrackDataPages(value);
metaPages[finalMap].master = true;
metaPages.erase(finalMap);
bufferLocations.erase(finalMap);
@ -432,7 +467,7 @@ namespace Mist {
//Otherwise open and parse the page
//Open the page if it is not yet open
if (!curPageNum.count(tNum) || curPageNum[tNum] != pageNum) {
if (!curPageNum.count(tNum) || curPageNum[tNum] != pageNum || !curPage[tNum].mapped){
//DO NOT ERASE THE PAGE HERE, master is not set to true
curPageNum.erase(tNum);
char nextPageName[NAME_BUFFER_SIZE];
@ -448,6 +483,10 @@ namespace Mist {
DTSC::Packet tmpPack;
if (!curPage[tNum].mapped[pageData.curOffset]){
VERYHIGH_MSG("No packet on page %lu for track %lu, waiting...", pageNum, tNum);
return;
}
tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0);
//No new data has been written on the page since last update
if (!tmpPack) {

View file

@ -22,11 +22,13 @@ namespace Mist {
void trackSelect(std::string trackSpec);
bool removeKey(unsigned int tid);
void removeUnused();
void eraseTrackDataPages(unsigned long tid);
void finish();
void userCallback(char * data, size_t len, unsigned int id);
std::set<unsigned long> negotiatingTracks;
std::set<unsigned long> activeTracks;
std::map<unsigned long, unsigned long long> lastUpdated;
std::map<unsigned long, unsigned long long> negotiationTimeout;
///Maps trackid to a pagenum->pageData map
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > bufferLocations;
std::map<unsigned long, char *> pushLocation;

View file

@ -531,6 +531,9 @@ namespace Mist {
INFO_MSG("Buffer has indicated that incoming track %lu should start writing on track %lu, page %lu", tid, finalTid, firstPage);
trackMap[tid] = finalTid;
if (myMeta.tracks.count(finalTid) && myMeta.tracks[finalTid].lastms){
myMeta.tracks[finalTid].lastms = 0;
}
trackState[tid] = FILL_ACC;
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), finalTid);