Fixed removing pages at the right points.

This commit is contained in:
Erik Zandvliet 2015-05-20 14:45:38 +02:00
parent 049e9babe0
commit 926dd01995
2 changed files with 144 additions and 113 deletions

View file

@ -17,7 +17,7 @@
#endif #endif
namespace Mist { namespace Mist {
inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg) { inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg){
capa["name"] = "Buffer"; capa["name"] = "Buffer";
JSON::Value option; JSON::Value option;
option["arg"] = "integer"; option["arg"] = "integer";
@ -82,23 +82,25 @@ namespace Mist {
segmentSize = 0; segmentSize = 0;
} }
inputBuffer::~inputBuffer() { inputBuffer::~inputBuffer(){
config->is_active = false; config->is_active = false;
if (myMeta.tracks.size()) { if (myMeta.tracks.size()){
DEBUG_MSG(DLVL_DEVEL, "Cleaning up, removing last keyframes"); DEBUG_MSG(DLVL_DEVEL, "Cleaning up, removing last keyframes");
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[it->first]; std::map<unsigned long, DTSCPageData> & locations = bufferLocations[it->first];
if (!metaPages.count(it->first) || !metaPages[it->first].mapped){
continue;
}
//First detect all entries on metaPage //First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8) { for (int i = 0; i < 8192; i += 8){
int * tmpOffset = (int *)(metaPages[it->first].mapped + i); int * tmpOffset = (int *)(metaPages[it->first].mapped + i);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0) { if (tmpOffset[0] == 0 && tmpOffset[1] == 0){
continue; continue;
} }
unsigned long keyNum = ntohl(tmpOffset[0]); unsigned long keyNum = ntohl(tmpOffset[0]);
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)) { if (!locations.count(keyNum)){
locations[keyNum].curOffset = 0; locations[keyNum].curOffset = 0;
} }
locations[keyNum].pageNum = keyNum; locations[keyNum].pageNum = keyNum;
@ -114,14 +116,14 @@ namespace Mist {
} }
} }
void inputBuffer::updateMeta() { void inputBuffer::updateMeta(){
long long unsigned int firstms = 0xFFFFFFFFFFFFFFFFull; long long unsigned int firstms = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int lastms = 0; long long unsigned int lastms = 0;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.firstms < firstms) { if (it->second.firstms < firstms){
firstms = it->second.firstms; firstms = it->second.firstms;
} }
if (it->second.firstms > lastms) { if (it->second.firstms > lastms){
lastms = it->second.lastms; lastms = it->second.lastms;
} }
} }
@ -132,7 +134,7 @@ namespace Mist {
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore liveMeta(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1); IPC::semaphore liveMeta(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.wait(); liveMeta.wait();
if (!metaPages.count(0) || !metaPages[0].mapped) { if (!metaPages.count(0) || !metaPages[0].mapped){
char pageName[NAME_BUFFER_SIZE]; char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
metaPages[0].init(pageName, 8 * 1024 * 1024, true); metaPages[0].init(pageName, 8 * 1024 * 1024, true);
@ -143,19 +145,19 @@ namespace Mist {
liveMeta.post(); liveMeta.post();
} }
bool inputBuffer::removeKey(unsigned int tid) { bool inputBuffer::removeKey(unsigned int tid){
if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active) { if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active){
return false; return false;
} }
if (!myMeta.tracks[tid].keys.size()) { if (!myMeta.tracks[tid].keys.size()){
return false; return false;
} }
DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber()); DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber());
//remove all parts of this key //remove all parts of this key
for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) { for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++){
/*LTS-START*/ /*LTS-START*/
if (recFile.is_open()) { if (recFile.is_open()){
if (!recMeta.tracks.count(tid)) { if (!recMeta.tracks.count(tid)){
recMeta.tracks[tid] = myMeta.tracks[tid]; recMeta.tracks[tid] = myMeta.tracks[tid];
recMeta.tracks[tid].reset(); recMeta.tracks[tid].reset();
} }
@ -177,7 +179,7 @@ namespace Mist {
DTSC::Packet recPack; DTSC::Packet recPack;
int pageLen = dataPages[tid][bufferLocations[tid].begin()->first].len; int pageLen = dataPages[tid][bufferLocations[tid].begin()->first].len;
char * pageMapped = dataPages[tid][bufferLocations[tid].begin()->first].mapped; char * pageMapped = dataPages[tid][bufferLocations[tid].begin()->first].mapped;
while( bpos < (unsigned long long)pageLen) { while( bpos < (unsigned long long)pageLen){
int tmpSize = ((int)pageMapped[bpos + 4] << 24) | ((int)pageMapped[bpos + 5] << 16) | ((int)pageMapped[bpos + 6] << 8) | (int)pageMapped[bpos + 7]; int tmpSize = ((int)pageMapped[bpos + 4] << 24) | ((int)pageMapped[bpos + 5] << 16) | ((int)pageMapped[bpos + 6] << 8) | (int)pageMapped[bpos + 7];
tmpSize += 8; tmpSize += 8;
recPack.reInit(pageMapped + bpos, tmpSize, true); recPack.reInit(pageMapped + bpos, tmpSize, true);
@ -212,27 +214,27 @@ namespace Mist {
//re-calculate firstms //re-calculate firstms
myMeta.tracks[tid].firstms = myMeta.tracks[tid].keys[0].getTime(); myMeta.tracks[tid].firstms = myMeta.tracks[tid].keys[0].getTime();
//delete the fragment if it's no longer fully buffered //delete the fragment if it's no longer fully buffered
if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()) { if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()){
myMeta.tracks[tid].fragments.pop_front(); myMeta.tracks[tid].fragments.pop_front();
myMeta.tracks[tid].missedFrags ++; myMeta.tracks[tid].missedFrags ++;
} }
//if there is more than one page buffered for this track... //if there is more than one page buffered for this track...
if (bufferLocations[tid].size() > 1) { if (bufferLocations[tid].size() > 1){
//Check if the first key starts on the second page or higher //Check if the first key starts on the second page or higher
if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active) { if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){
//Find page in indexpage and null it //Find page in indexpage and null it
for (int i = 0; i < 8192; i += 8) { for (int i = 0; i < 8192; i += 8){
unsigned int thisKeyNum = ((((long long int *)(metaPages[tid].mapped + i))[0]) >> 32) & 0xFFFFFFFF; unsigned int thisKeyNum = ((((long long int *)(metaPages[tid].mapped + i))[0]) >> 32) & 0xFFFFFFFF;
if (thisKeyNum == htonl(pagesByTrack[tid].begin()->first) && ((((long long int *)(metaPages[tid].mapped + i))[0]) != 0)) { if (thisKeyNum == htonl(pagesByTrack[tid].begin()->first) && ((((long long int *)(metaPages[tid].mapped + i))[0]) != 0)){
(((long long int *)(metaPages[tid].mapped + i))[0]) = 0; (((long long int *)(metaPages[tid].mapped + i))[0]) = 0;
} }
} }
DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1); DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1);
bufferRemove(tid, bufferLocations[tid].begin()->first); bufferRemove(tid, bufferLocations[tid].begin()->first);
for (int i = 0; i < 1024; i++) { for (int i = 0; i < 1024; i++){
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8)); int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
int tmpNum = ntohl(tmpOffset[0]); int tmpNum = ntohl(tmpOffset[0]);
if (tmpNum == bufferLocations[tid].begin()->first) { if (tmpNum == bufferLocations[tid].begin()->first){
tmpOffset[0] = 0; tmpOffset[0] = 0;
tmpOffset[1] = 0; tmpOffset[1] = 0;
} }
@ -253,42 +255,53 @@ namespace Mist {
return true; return true;
} }
void inputBuffer::finish() { void inputBuffer::eraseTrackDataPages(unsigned long tid){
Input::finish(); if (!bufferLocations.count(tid)){
for (std::map<unsigned long, std::map<unsigned long, DTSCPageData> >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++) { return;
for (std::map<unsigned long, DTSCPageData>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) {
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), it->first, it2->first);
curPage[it->first].init(thisPageName, 20971520, false, false);
curPage[it->first].master = true;
curPage.erase(it->first);
} }
for (std::map<unsigned long, DTSCPageData>::iterator it = bufferLocations[tid].begin(); it != bufferLocations[tid].end(); it++){
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tid, it->first);
curPage[tid].init(thisPageName, 20971520, false, false);
curPage[tid].master = true;
curPage.erase(tid);
}
bufferLocations.erase(tid);
metaPages[tid].master = true;
metaPages.erase(tid);
}
void inputBuffer::finish(){
Input::finish();
updateMeta();
for (std::map<unsigned long, std::map<unsigned long, DTSCPageData> >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++){
eraseTrackDataPages(it->first);
} }
} }
void inputBuffer::removeUnused() { void inputBuffer::removeUnused(){
//first remove all tracks that have not been updated for too long //first remove all tracks that have not been updated for too long
bool changed = true; bool changed = true;
while (changed) { while (changed){
changed = false; changed = false;
long long unsigned int time = Util::bootSecs(); long long unsigned int time = Util::bootSecs();
long long unsigned int compareFirst = 0xFFFFFFFFFFFFFFFFull; long long unsigned int compareFirst = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int compareLast = 0; long long unsigned int compareLast = 0;
//for tracks that were updated in the last 5 seconds, get the first and last ms edges. //for tracks that were updated in the last 5 seconds, get the first and last ms edges.
for (std::map<unsigned int, DTSC::Track>::iterator it2 = myMeta.tracks.begin(); it2 != myMeta.tracks.end(); it2++) { for (std::map<unsigned int, DTSC::Track>::iterator it2 = myMeta.tracks.begin(); it2 != myMeta.tracks.end(); it2++){
if ((time - lastUpdated[it2->first]) > 5) { if ((time - lastUpdated[it2->first]) > 5){
continue; continue;
} }
if (it2->second.lastms > compareLast) { if (it2->second.lastms > compareLast){
compareLast = it2->second.lastms; compareLast = it2->second.lastms;
} }
if (it2->second.firstms < compareFirst) { if (it2->second.firstms < compareFirst){
compareFirst = it2->second.firstms; compareFirst = it2->second.firstms;
} }
} }
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
//if not updated for an entire buffer duration, or last updated track and this track differ by an entire buffer duration, erase the track. //if not updated for an entire buffer duration, or last updated track and this track differ by an entire buffer duration, erase the track.
if ((time - lastUpdated[it->first]) > (bufferTime / 1000) || (compareLast && (time - lastUpdated[it->first]) > 5 && ((myMeta.tracks[it->first].firstms - compareLast) > bufferTime || (compareFirst - myMeta.tracks[it->first].lastms) > bufferTime))) { if ((time - lastUpdated[it->first]) > (bufferTime / 1000) || (compareLast && (time - lastUpdated[it->first]) > 5 && ((myMeta.tracks[it->first].firstms - compareLast) > bufferTime || (compareFirst - myMeta.tracks[it->first].lastms) > bufferTime))){
unsigned int tid = it->first; unsigned int tid = it->first;
//erase this track //erase this track
INFO_MSG("Erasing track %d because of timeout", it->first); INFO_MSG("Erasing track %d because of timeout", it->first);
@ -314,29 +327,29 @@ namespace Mist {
} }
//find the earliest video keyframe stored //find the earliest video keyframe stored
unsigned int firstVideo = 1; unsigned int firstVideo = 1;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.type == "video") { if (it->second.type == "video"){
if (it->second.firstms < firstVideo || firstVideo == 1) { if (it->second.firstms < firstVideo || firstVideo == 1){
firstVideo = it->second.firstms; firstVideo = it->second.firstms;
} }
} }
} }
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
//non-video tracks need to have a second keyframe that is <= firstVideo //non-video tracks need to have a second keyframe that is <= firstVideo
if (it->second.type != "video") { if (it->second.type != "video"){
if (it->second.keys.size() < 2 || it->second.keys[1].getTime() > firstVideo) { if (it->second.keys.size() < 2 || it->second.keys[1].getTime() > firstVideo){
continue; continue;
} }
} }
//Buffer cutting //Buffer cutting
while (it->second.keys.size() > 1 && it->second.keys[0].getTime() < cutTime) { while (it->second.keys.size() > 1 && it->second.keys[0].getTime() < cutTime){
if (!removeKey(it->first)) { if (!removeKey(it->first)){
break; break;
} }
} }
//Buffer size management //Buffer size management
while (it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime) { while (it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime){
if (!removeKey(it->first)) { if (!removeKey(it->first)){
break; break;
} }
} }
@ -344,10 +357,10 @@ namespace Mist {
updateMeta(); updateMeta();
} }
void inputBuffer::userCallback(char * data, size_t len, unsigned int id) { void inputBuffer::userCallback(char * data, size_t len, unsigned int id){
/*LTS-START*/ /*LTS-START*/
//Reload the configuration to make sure we stay up to date with changes through the api //Reload the configuration to make sure we stay up to date with changes through the api
if (Util::epoch() - lastReTime > 4) { if (Util::epoch() - lastReTime > 4){
setup(); setup();
} }
/*LTS-END*/ /*LTS-END*/
@ -356,39 +369,41 @@ namespace Mist {
//Get the counter of this user //Get the counter of this user
char counter = (*(data - 1)); char counter = (*(data - 1));
//Each user can have at maximum SIMUL_TRACKS elements in their userpage. //Each user can have at maximum SIMUL_TRACKS elements in their userpage.
for (int index = 0; index < SIMUL_TRACKS; index++) { for (int index = 0; index < SIMUL_TRACKS; index++){
char * thisData = data + (index * 6); char * thisData = data + (index * 6);
//Get the track id from the current element //Get the track id from the current element
unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3]; unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3];
//Skip value 0xFFFFFFFF as this indicates a previously declined track //Skip value 0xFFFFFFFF as this indicates a previously declined track
if (value == 0xFFFFFFFF) { if (value == 0xFFFFFFFF){
continue; continue;
} }
//Skip value 0 as this indicates an empty track //Skip value 0 as this indicates an empty track
if (value == 0) { if (value == 0){
continue; continue;
} }
//If the current value indicates a valid trackid, and it is pushed from this user //If the current value indicates a valid trackid, and it is pushed from this user
if (pushLocation[value] == data) { if (pushLocation[value] == data){
//Check for timeouts, and erase the track if necessary //Check for timeouts, and erase the track if necessary
if (counter == 126 || counter == 127 || counter == 254 || counter == 255) { if (counter == 126 || counter == 127 || counter == 254 || counter == 255){
pushLocation.erase(value); pushLocation.erase(value);
if (negotiatingTracks.count(value)) { if (negotiatingTracks.count(value)){
negotiatingTracks.erase(value); negotiatingTracks.erase(value);
metaPages[value].master = true;
metaPages.erase(value);
} }
if (activeTracks.count(value)) { if (activeTracks.count(value)){
updateMeta();
eraseTrackDataPages(value);
activeTracks.erase(value); activeTracks.erase(value);
bufferLocations.erase(value); bufferLocations.erase(value);
} }
metaPages[value].master = true;
metaPages.erase(value);
continue; continue;
} }
} }
//Track is set to "New track request", assign new track id and create shared memory page //Track is set to "New track request", assign new track id and create shared memory page
//This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process //This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process
if (value & 0x80000000) { if (value & 0x80000000){
//Set the temporary track id for this item, and increase the temporary value for use with the next track //Set the temporary track id for this item, and increase the temporary value for use with the next track
unsigned long long tempMapping = nextTempId++; unsigned long long tempMapping = nextTempId++;
//Add the temporary track id to the list of tracks that are currently being negotiated //Add the temporary track id to the list of tracks that are currently being negotiated
@ -407,16 +422,20 @@ namespace Mist {
} }
//The track id is set to the value of a track that we are currently negotiating about //The track id is set to the value of a track that we are currently negotiating about
if (negotiatingTracks.count(value)) { if (negotiatingTracks.count(value)){
//If the metadata page for this track is not yet registered, initialize it //If the metadata page for this track is not yet registered, initialize it
if (!metaPages.count(value) || !metaPages[value].mapped) { if (!metaPages.count(value) || !metaPages[value].mapped){
char tempMetaName[NAME_BUFFER_SIZE]; char tempMetaName[NAME_BUFFER_SIZE];
snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), value); snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), value);
metaPages[value].init(tempMetaName, 8388608, false, false); metaPages[value].init(tempMetaName, 8388608, false, false);
} }
//If this tracks metdata page is not initialize, skip the entire element for now. It will be instantiated later //If this tracks metdata page is not initialize, skip the entire element for now. It will be instantiated later
if (!metaPages[value].mapped) { if (!metaPages[value].mapped){
///\todo Maybe add a timeout counter here, for when we dont expect the track to appear anymore //remove the negotiation if it has timed out
if (++negotiationTimeout[value] >= 1000){
negotiatingTracks.erase(value);
negotiationTimeout.erase(value);
}
continue; continue;
} }
@ -433,7 +452,15 @@ namespace Mist {
//Construct a metadata object for the current track //Construct a metadata object for the current track
DTSC::Meta trackMeta(tempJSONForMeta); DTSC::Meta trackMeta(tempJSONForMeta);
//If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call. //If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call.
if (!trackMeta.tracks.count(value)) { if (!trackMeta.tracks.count(value)){
//remove the negotiation if it has timed out
if (++negotiationTimeout[value] >= 1000){
negotiatingTracks.erase(value);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
metaPages[value].master = true;
metaPages.erase(value);
negotiationTimeout.erase(value);
}
continue; continue;
} }
@ -442,10 +469,10 @@ namespace Mist {
/*LTS-START*/ /*LTS-START*/
//Get the identifier for the track, and attempt colission detection. //Get the identifier for the track, and attempt colission detection.
int collidesWith = -1; int collidesWith = -1;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
//If the identifier of an existing track and the current track match, assume the are the same track and reject the negotiated one. //If the identifier of an existing track and the current track match, assume the are the same track and reject the negotiated one.
///\todo Maybe switch to a new form of detecting collisions, especially with regards to multiple audio languages and camera angles. ///\todo Maybe switch to a new form of detecting collisions, especially with regards to multiple audio languages and camera angles.
if (it->second.getIdentifier() == trackIdentifier) { if (it->second.getIdentifier() == trackIdentifier){
collidesWith = it->first; collidesWith = it->first;
break; break;
} }
@ -459,14 +486,14 @@ namespace Mist {
metaPages.erase(value); metaPages.erase(value);
//Check if the track collides, and whether the track it collides with is active. //Check if the track collides, and whether the track it collides with is active.
if (collidesWith != -1 && activeTracks.count(collidesWith)) {/*LTS*/ if (collidesWith != -1 && activeTracks.count(collidesWith)){/*LTS*/
//Print a warning message and set the state of the track to rejected. //Print a warning message and set the state of the track to rejected.
WARN_MSG("Collision of temporary track %lu with existing track %d detected. Handling as a new valid track.", value, collidesWith); WARN_MSG("Collision of temporary track %lu with existing track %d detected. Handling as a new valid track.", value, collidesWith);
collidesWith = -1; collidesWith = -1;
} }
/*LTS-START*/ /*LTS-START*/
unsigned long finalMap = collidesWith; unsigned long finalMap = collidesWith;
if (finalMap == -1) { if (finalMap == -1){
//No collision has been detected, assign a new final number //No collision has been detected, assign a new final number
finalMap = (myMeta.tracks.size() ? myMeta.tracks.rbegin()->first : 0) + 1; finalMap = (myMeta.tracks.size() ? myMeta.tracks.rbegin()->first : 0) + 1;
DEBUG_MSG(DLVL_DEVEL, "No colision detected for temporary track %lu from user %u, assigning final track number %lu", value, id, finalMap); DEBUG_MSG(DLVL_DEVEL, "No colision detected for temporary track %lu from user %u, assigning final track number %lu", value, id, finalMap);
@ -474,8 +501,8 @@ namespace Mist {
/*LTS-END*/ /*LTS-END*/
//Resume either if we have more than 1 keyframe on the replacement track (assume it was already pushing before the track "dissapeared") //Resume either if we have more than 1 keyframe on the replacement track (assume it was already pushing before the track "dissapeared")
//or if the firstms of the replacement track is later than the lastms on the existing track //or if the firstms of the replacement track is later than the lastms on the existing track
if (!myMeta.tracks.count(finalMap) || trackMeta.tracks.find(value)->second.keys.size() > 1 || trackMeta.tracks.find(value)->second.firstms >= myMeta.tracks[finalMap].lastms) { if (!myMeta.tracks.count(finalMap) || trackMeta.tracks.find(value)->second.keys.size() > 1 || trackMeta.tracks.find(value)->second.firstms >= myMeta.tracks[finalMap].lastms){
if (myMeta.tracks.count(finalMap) && myMeta.tracks[finalMap].lastms > 0) { if (myMeta.tracks.count(finalMap) && myMeta.tracks[finalMap].lastms > 0){
INFO_MSG("Resume of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id); INFO_MSG("Resume of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id);
} else { } else {
INFO_MSG("New track detected, assigned track id %lu, coming from temporary track %lu of user %u", finalMap, value, id); INFO_MSG("New track detected, assigned track id %lu, coming from temporary track %lu of user %u", finalMap, value, id);
@ -485,6 +512,8 @@ namespace Mist {
INFO_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id); INFO_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id);
myMeta.tracks.erase(finalMap); myMeta.tracks.erase(finalMap);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages //Set master to true before erasing the page, because we are responsible for cleaning up unused pages
updateMeta();
eraseTrackDataPages(value);
metaPages[finalMap].master = true; metaPages[finalMap].master = true;
metaPages.erase(finalMap); metaPages.erase(finalMap);
bufferLocations.erase(finalMap); bufferLocations.erase(finalMap);
@ -497,7 +526,7 @@ namespace Mist {
//Register the user thats is pushing this element //Register the user thats is pushing this element
pushLocation[finalMap] = data; pushLocation[finalMap] = data;
//Initialize the metadata for this track if it was not in place yet. //Initialize the metadata for this track if it was not in place yet.
if (!myMeta.tracks.count(finalMap)) { if (!myMeta.tracks.count(finalMap)){
DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap); DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap);
myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second; myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second;
myMeta.tracks[finalMap].trackID = finalMap; myMeta.tracks[finalMap].trackID = finalMap;
@ -516,14 +545,14 @@ namespace Mist {
updateMeta(); updateMeta();
} }
//If the track is active, and this is the element responsible for pushing it //If the track is active, and this is the element responsible for pushing it
if (activeTracks.count(value) && pushLocation[value] == data) { if (activeTracks.count(value) && pushLocation[value] == data){
//Open the track index page if we dont have it open yet //Open the track index page if we dont have it open yet
if (!metaPages.count(value) || !metaPages[value].mapped) { if (!metaPages.count(value) || !metaPages[value].mapped){
char firstPage[NAME_BUFFER_SIZE]; char firstPage[NAME_BUFFER_SIZE];
snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), value); snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), value);
metaPages[value].init(firstPage, 8192, false, false); metaPages[value].init(firstPage, 8192, false, false);
} }
if (metaPages[value].mapped) { if (metaPages[value].mapped){
//Update the metadata for this track //Update the metadata for this track
updateTrackMeta(value); updateTrackMeta(value);
} }
@ -531,20 +560,20 @@ namespace Mist {
} }
} }
void inputBuffer::updateTrackMeta(unsigned long tNum) { void inputBuffer::updateTrackMeta(unsigned long tNum){
//Store a reference for easier access //Store a reference for easier access
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[tNum]; std::map<unsigned long, DTSCPageData> & locations = bufferLocations[tNum];
//First detect all entries on metaPage //First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8) { for (int i = 0; i < 8192; i += 8){
int * tmpOffset = (int *)(metaPages[tNum].mapped + i); int * tmpOffset = (int *)(metaPages[tNum].mapped + i);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0) { if (tmpOffset[0] == 0 && tmpOffset[1] == 0){
continue; continue;
} }
unsigned long keyNum = ntohl(tmpOffset[0]); unsigned long keyNum = ntohl(tmpOffset[0]);
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)) { if (!locations.count(keyNum)){
locations[keyNum].curOffset = 0; locations[keyNum].curOffset = 0;
} }
locations[keyNum].pageNum = keyNum; locations[keyNum].pageNum = keyNum;
@ -552,19 +581,19 @@ namespace Mist {
} }
//Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest //Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest
for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) { for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++){
updateMetaFromPage(tNum, pageIt->first); updateMetaFromPage(tNum, pageIt->first);
} }
updateMeta(); updateMeta();
} }
void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum) { void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum){
DTSCPageData & pageData = bufferLocations[tNum][pageNum]; DTSCPageData & pageData = bufferLocations[tNum][pageNum];
//If the current page is over its 8mb "splitting" boundary //If the current page is over its 8mb "splitting" boundary
if (pageData.curOffset > (8 * 1024 * 1024)) { if (pageData.curOffset > (8 * 1024 * 1024)){
//And the last keyframe in the parsed metadata is further in the stream than this page //And the last keyframe in the parsed metadata is further in the stream than this page
if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()) { if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()){
//Assume the entire page is already parsed //Assume the entire page is already parsed
return; return;
} }
@ -573,14 +602,14 @@ namespace Mist {
//Otherwise open and parse the page //Otherwise open and parse the page
//Open the page if it is not yet open //Open the page if it is not yet open
if (!curPageNum.count(tNum) || curPageNum[tNum] != pageNum) { if (!curPageNum.count(tNum) || curPageNum[tNum] != pageNum){
//DO NOT ERASE THE PAGE HERE, master is not set to true //DO NOT ERASE THE PAGE HERE, master is not set to true
curPageNum.erase(tNum); curPageNum.erase(tNum);
char nextPageName[NAME_BUFFER_SIZE]; char nextPageName[NAME_BUFFER_SIZE];
snprintf(nextPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tNum, pageNum); snprintf(nextPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tNum, pageNum);
curPage[tNum].init(nextPageName, 20971520); curPage[tNum].init(nextPageName, 20971520);
//If the page can not be opened, stop here //If the page can not be opened, stop here
if (!curPage[tNum].mapped) { if (!curPage[tNum].mapped){
WARN_MSG("Could not open page: %s", nextPageName); WARN_MSG("Could not open page: %s", nextPageName);
return; return;
} }
@ -591,16 +620,16 @@ namespace Mist {
DTSC::Packet tmpPack; DTSC::Packet tmpPack;
tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0); tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0);
//No new data has been written on the page since last update //No new data has been written on the page since last update
if (!tmpPack) { if (!tmpPack){
return; return;
} }
lastUpdated[tNum] = Util::bootSecs(); lastUpdated[tNum] = Util::bootSecs();
while (tmpPack) { while (tmpPack){
//Update the metadata with this packet //Update the metadata with this packet
///\todo Why is there an LTS tag here? ///\todo Why is there an LTS tag here?
myMeta.update(tmpPack, segmentSize);/*LTS*/ myMeta.update(tmpPack, segmentSize);/*LTS*/
//Set the first time when appropriate //Set the first time when appropriate
if (pageData.firstTime == 0) { if (pageData.firstTime == 0){
pageData.firstTime = tmpPack.getTime(); pageData.firstTime = tmpPack.getTime();
} }
//Update the offset on the page with the size of the current packet //Update the offset on the page with the size of the current packet
@ -610,7 +639,7 @@ namespace Mist {
} }
} }
bool inputBuffer::setup() { bool inputBuffer::setup(){
lastReTime = Util::epoch(); /*LTS*/ lastReTime = Util::epoch(); /*LTS*/
std::string strName = config->getString("streamname"); std::string strName = config->getString("streamname");
Util::sanitizeName(strName); Util::sanitizeName(strName);
@ -622,10 +651,10 @@ namespace Mist {
long long tmpNum; long long tmpNum;
//if stream is configured and setting is present, use it, always //if stream is configured and setting is present, use it, always
if (streamCfg && streamCfg.getMember("DVR")) { if (streamCfg && streamCfg.getMember("DVR")){
tmpNum = streamCfg.getMember("DVR").asInt(); tmpNum = streamCfg.getMember("DVR").asInt();
} else { } else {
if (streamCfg) { if (streamCfg){
//otherwise, if stream is configured use the default //otherwise, if stream is configured use the default
tmpNum = config->getOption("bufferTime", true)[0u].asInt(); tmpNum = config->getOption("bufferTime", true)[0u].asInt();
} else { } else {
@ -634,17 +663,17 @@ namespace Mist {
} }
} }
//if the new value is different, print a message and apply it //if the new value is different, print a message and apply it
if (bufferTime != tmpNum) { if (bufferTime != tmpNum){
DEBUG_MSG(DLVL_DEVEL, "Setting bufferTime from %u to new value of %lli", bufferTime, tmpNum); DEBUG_MSG(DLVL_DEVEL, "Setting bufferTime from %u to new value of %lli", bufferTime, tmpNum);
bufferTime = tmpNum; bufferTime = tmpNum;
} }
/*LTS-START*/ /*LTS-START*/
//if stream is configured and setting is present, use it, always //if stream is configured and setting is present, use it, always
if (streamCfg && streamCfg.getMember("cut")) { if (streamCfg && streamCfg.getMember("cut")){
tmpNum = streamCfg.getMember("cut").asInt(); tmpNum = streamCfg.getMember("cut").asInt();
} else { } else {
if (streamCfg) { if (streamCfg){
//otherwise, if stream is configured use the default //otherwise, if stream is configured use the default
tmpNum = config->getOption("cut", true)[0u].asInt(); tmpNum = config->getOption("cut", true)[0u].asInt();
} else { } else {
@ -653,17 +682,17 @@ namespace Mist {
} }
} }
//if the new value is different, print a message and apply it //if the new value is different, print a message and apply it
if (cutTime != tmpNum) { if (cutTime != tmpNum){
DEBUG_MSG(DLVL_DEVEL, "Setting cutTime from %u to new value of %lli", cutTime, tmpNum); DEBUG_MSG(DLVL_DEVEL, "Setting cutTime from %u to new value of %lli", cutTime, tmpNum);
cutTime = tmpNum; cutTime = tmpNum;
} }
//if stream is configured and setting is present, use it, always //if stream is configured and setting is present, use it, always
if (streamCfg && streamCfg.getMember("segmentsize")) { if (streamCfg && streamCfg.getMember("segmentsize")){
tmpNum = streamCfg.getMember("segmentsize").asInt(); tmpNum = streamCfg.getMember("segmentsize").asInt();
} else { } else {
if (streamCfg) { if (streamCfg){
//otherwise, if stream is configured use the default //otherwise, if stream is configured use the default
tmpNum = config->getOption("segmentsize", true)[0u].asInt(); tmpNum = config->getOption("segmentsize", true)[0u].asInt();
} else { } else {
@ -672,7 +701,7 @@ namespace Mist {
} }
} }
//if the new value is different, print a message and apply it //if the new value is different, print a message and apply it
if (segmentSize != tmpNum) { if (segmentSize != tmpNum){
DEBUG_MSG(DLVL_DEVEL, "Setting segmentSize from %u to new value of %lli", segmentSize, tmpNum); DEBUG_MSG(DLVL_DEVEL, "Setting segmentSize from %u to new value of %lli", segmentSize, tmpNum);
segmentSize = tmpNum; segmentSize = tmpNum;
} }
@ -680,10 +709,10 @@ namespace Mist {
/* /*
//if stream is configured and setting is present, use it, always //if stream is configured and setting is present, use it, always
std::string rec; std::string rec;
if (streamCfg && streamCfg.getMember("record")) { if (streamCfg && streamCfg.getMember("record")){
rec = streamCfg.getMember("record").asInt(); rec = streamCfg.getMember("record").asInt();
} else { } else {
if (streamCfg) { if (streamCfg){
//otherwise, if stream is configured use the default //otherwise, if stream is configured use the default
rec = config->getOption("record", true)[0u].asString(); rec = config->getOption("record", true)[0u].asString();
} else { } else {
@ -692,17 +721,17 @@ namespace Mist {
} }
} }
//if the new value is different, print a message and apply it //if the new value is different, print a message and apply it
if (recName != rec) { if (recName != rec){
//close currently recording file, for we should open a new one //close currently recording file, for we should open a new one
DEBUG_MSG(DLVL_DEVEL, "Stopping recording of %s to %s", config->getString("streamname").c_str(), recName.c_str()); DEBUG_MSG(DLVL_DEVEL, "Stopping recording of %s to %s", config->getString("streamname").c_str(), recName.c_str());
recFile.close(); recFile.close();
recMeta.tracks.clear(); recMeta.tracks.clear();
recName = rec; recName = rec;
} }
if (recName != "" && !recFile.is_open()) { if (recName != "" && !recFile.is_open()){
DEBUG_MSG(DLVL_DEVEL, "Starting recording of %s to %s", config->getString("streamname").c_str(), recName.c_str()); DEBUG_MSG(DLVL_DEVEL, "Starting recording of %s to %s", config->getString("streamname").c_str(), recName.c_str());
recFile.open(recName.c_str()); recFile.open(recName.c_str());
if (recFile.fail()) { if (recFile.fail()){
DEBUG_MSG(DLVL_DEVEL, "Error occured during record opening: %s", strerror(errno)); DEBUG_MSG(DLVL_DEVEL, "Error occured during record opening: %s", strerror(errno));
} }
recBpos = 0; recBpos = 0;
@ -714,15 +743,15 @@ namespace Mist {
return true; return true;
} }
bool inputBuffer::readHeader() { bool inputBuffer::readHeader(){
return true; return true;
} }
void inputBuffer::getNext(bool smart) {} void inputBuffer::getNext(bool smart){}
void inputBuffer::seek(int seekTime) {} void inputBuffer::seek(int seekTime){}
void inputBuffer::trackSelect(std::string trackSpec) {} void inputBuffer::trackSelect(std::string trackSpec){}
} }

View file

@ -26,11 +26,13 @@ namespace Mist {
void trackSelect(std::string trackSpec); void trackSelect(std::string trackSpec);
bool removeKey(unsigned int tid); bool removeKey(unsigned int tid);
void removeUnused(); void removeUnused();
void eraseTrackDataPages(unsigned long tid);
void finish(); void finish();
void userCallback(char * data, size_t len, unsigned int id); void userCallback(char * data, size_t len, unsigned int id);
std::set<unsigned long> negotiatingTracks; std::set<unsigned long> negotiatingTracks;
std::set<unsigned long> activeTracks; std::set<unsigned long> activeTracks;
std::map<unsigned long, unsigned long long> lastUpdated; std::map<unsigned long, unsigned long long> lastUpdated;
std::map<unsigned long, unsigned long long> negotiationTimeout;
///Maps trackid to a pagenum->pageData map ///Maps trackid to a pagenum->pageData map
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > bufferLocations; std::map<unsigned long, std::map<unsigned long, DTSCPageData> > bufferLocations;
std::map<unsigned long, char *> pushLocation; std::map<unsigned long, char *> pushLocation;