Working multi-input

This commit is contained in:
Erik Zandvliet 2015-04-02 09:56:47 +02:00
parent 9b6312ca01
commit d370ef4eac
31 changed files with 1264 additions and 690 deletions

View file

@ -11,7 +11,7 @@ macro(makeAnalyser analyserName format)
endmacro()
macro(makeInput inputName format)
add_executable( MistIn${inputName} src/input/mist_in.cpp src/input/input.cpp src/input/input_${format}.cpp )
add_executable( MistIn${inputName} src/input/mist_in.cpp src/input/input.cpp src/input/input_${format}.cpp src/io.cpp)
set_target_properties( MistIn${inputName} PROPERTIES COMPILE_DEFINITIONS INPUTTYPE=\"input_${format}.h\")
target_link_libraries( MistIn${inputName} mist )
endmacro()
@ -28,7 +28,7 @@ macro(makeOutput outputName format)
if (";${ARGN};" MATCHES ";ts;")
SET(tsOutput src/output/output_ts_base.cpp)
endif()
add_executable( MistOut${outputName} src/output/mist_out.cpp src/output/output.cpp ${httpOutput} ${tsOutput} src/output/output_${format}.cpp )
add_executable( MistOut${outputName} src/output/mist_out.cpp src/output/output.cpp ${httpOutput} ${tsOutput} src/output/output_${format}.cpp src/io.cpp)
set_target_properties( MistOut${outputName} PROPERTIES COMPILE_DEFINITIONS "OUTPUTTYPE=\"output_${format}.h\";TS_BASECLASS=${tsBaseClass}")
target_link_libraries( MistOut${outputName} mist )
endmacro()
@ -36,8 +36,6 @@ endmacro()
set(lspSOURCES lsp/plugins/md5.js lsp/plugins/cattablesort.js lsp/mist.js)
set(lspSOURCESmin lsp/plugins/jquery.js lsp/plugins/jquery.flot.min.js lsp/plugins/jquery.flot.time.min.js lsp/plugins/jquery.qrcode.min.js)
set(lspDATA lsp/header.html lsp/main.css lsp/footer.html)
SET(SOURCE_DIR ${PROJECT_SOURCE_DIR})
SET(BINARY_DIR ${PROJECT_BINARY_DIR})
include_directories(${SOURCE_DIR})
@ -81,9 +79,7 @@ set(PACKAGE_VERSION \"${PACKAGE_VERSION}\" )
if (NOT DEBUG)
set(DEBUG 4)
endif()
message("Builing release ${RELEASE} for version ${PACKAGE_VERSION} @ debug level ${DEBUG}")
add_definitions(-g -funsigned-char -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 -DDEBUG=${DEBUG} -DPACKAGE_VERSION=${PACKAGE_VERSION} -DRELEASE=${RELEASE})
if (NOT DEFINED ${NOSHM} )
add_definitions(-DSHM_ENABLED=1)
@ -104,7 +100,6 @@ if(DOXYGEN_FOUND)
VERBATIM
)
endif(DOXYGEN_FOUND)
#Compile the lib
set(libHeaders
${SOURCE_DIR}/lib/config.h
@ -229,7 +224,7 @@ add_custom_target( localSettingsPage
VERBATIM
)
add_executable( MistOutHTTP src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_http_internal.cpp)
add_executable( MistOutHTTP src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_http_internal.cpp src/io.cpp)
set_target_properties( MistOutHTTP PROPERTIES COMPILE_DEFINITIONS "OUTPUTTYPE=\"output_http_internal.h\"")
add_dependencies(MistOutHTTP embedcode)
target_link_libraries( MistOutHTTP mist )

View file

@ -76,116 +76,116 @@ MistInfo: src/analysers/info.cpp
inputs: MistInDTSC
MistInDTSC: override LDLIBS += $(THREADLIB)
MistInDTSC: override CPPFLAGS += "-DINPUTTYPE=\"input_dtsc.h\""
MistInDTSC: src/input/mist_in.cpp src/input/input.cpp src/input/input_dtsc.cpp
MistInDTSC: src/input/mist_in.cpp src/input/input.cpp src/input/input_dtsc.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
inputs: MistInMP3
MistInMP3: override LDLIBS += $(THREADLIB)
MistInMP3: override CPPFLAGS += "-DINPUTTYPE=\"input_mp3.h\""
MistInMP3: src/input/mist_in.cpp src/input/input.cpp src/input/input_mp3.cpp
MistInMP3: src/input/mist_in.cpp src/input/input.cpp src/input/input_mp3.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
inputs: MistInFLV
MistInFLV: override LDLIBS += $(THREADLIB)
MistInFLV: override CPPFLAGS += "-DINPUTTYPE=\"input_flv.h\""
MistInFLV: src/input/mist_in.cpp src/input/input.cpp src/input/input_flv.cpp
MistInFLV: src/input/mist_in.cpp src/input/input.cpp src/input/input_flv.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
inputs: MistInOGG
MistInOGG: override LDLIBS += $(THREADLIB)
MistInOGG: override CPPFLAGS += "-DINPUTTYPE=\"input_ogg.h\""
MistInOGG: src/input/mist_in.cpp src/input/input.cpp src/input/input_ogg.cpp
MistInOGG: src/input/mist_in.cpp src/input/input.cpp src/input/input_ogg.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
inputs: MistInBuffer
MistInBuffer: override LDLIBS += $(THREADLIB)
MistInBuffer: override CPPFLAGS += "-DINPUTTYPE=\"input_buffer.h\""
MistInBuffer: src/input/mist_in.cpp src/input/input.cpp src/input/input_buffer.cpp
MistInBuffer: src/input/mist_in.cpp src/input/input.cpp src/input/input_buffer.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutFLV
MistOutFLV: override LDLIBS += $(THREADLIB)
MistOutFLV: override CPPFLAGS += "-DOUTPUTTYPE=\"output_progressive_flv.h\""
MistOutFLV: src/output/mist_out.cpp src/output/output_http.cpp src/output/output.cpp src/output/output_progressive_flv.cpp
MistOutFLV: src/output/mist_out.cpp src/output/output_http.cpp src/output/output.cpp src/output/output_progressive_flv.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutOGG
MistOutOGG: override LDLIBS += $(THREADLIB)
MistOutOGG: override LDLIBS += $(GEOIP)
MistOutOGG: override CPPFLAGS += "-DOUTPUTTYPE=\"output_progressive_ogg.h\""
MistOutOGG: src/output/mist_out.cpp src/output/output_http.cpp src/output/output.cpp src/output/output_progressive_ogg.cpp
MistOutOGG: src/output/mist_out.cpp src/output/output_http.cpp src/output/output.cpp src/output/output_progressive_ogg.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutMP4
MistOutMP4: override LDLIBS += $(THREADLIB)
MistOutMP4: override CPPFLAGS += "-DOUTPUTTYPE=\"output_progressive_mp4.h\""
MistOutMP4: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_progressive_mp4.cpp
MistOutMP4: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_progressive_mp4.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutMP3
MistOutMP3: override LDLIBS += $(THREADLIB)
MistOutMP3: override CPPFLAGS += "-DOUTPUTTYPE=\"output_progressive_mp3.h\""
MistOutMP3: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_progressive_mp3.cpp
MistOutMP3: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_progressive_mp3.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutRTMP
MistOutRTMP: override LDLIBS += $(THREADLIB)
MistOutRTMP: override CPPFLAGS += "-DOUTPUTTYPE=\"output_rtmp.h\""
MistOutRTMP: src/output/mist_out.cpp src/output/output.cpp src/output/output_rtmp.cpp
MistOutRTMP: src/output/mist_out.cpp src/output/output.cpp src/output/output_rtmp.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutRaw
MistOutRaw: override LDLIBS += $(THREADLIB)
MistOutRaw: override CPPFLAGS += "-DOUTPUTTYPE=\"output_raw.h\""
MistOutRaw: src/output/mist_out.cpp src/output/output.cpp src/output/output_raw.cpp
MistOutRaw: src/output/mist_out.cpp src/output/output.cpp src/output/output_raw.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutHTTPTS
MistOutHTTPTS: override LDLIBS += $(THREADLIB)
MistOutHTTPTS: override CPPFLAGS += -DOUTPUTTYPE=\"output_httpts.h\" -DTS_BASECLASS=HTTPOutput
MistOutHTTPTS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_httpts.cpp src/output/output_ts_base.cpp
MistOutHTTPTS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_httpts.cpp src/output/output_ts_base.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutTS
MistOutTS: override LDLIBS += $(THREADLIB)
MistOutTS: override CPPFLAGS += "-DOUTPUTTYPE=\"output_ts.h\""
MistOutTS: src/output/mist_out.cpp src/output/output.cpp src/output/output_ts.cpp src/output/output_ts_base.cpp
MistOutTS: src/output/mist_out.cpp src/output/output.cpp src/output/output_ts.cpp src/output/output_ts_base.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutHTTP
MistOutHTTP: override LDLIBS += $(THREADLIB)
MistOutHTTP: override CPPFLAGS += "-DOUTPUTTYPE=\"output_http_internal.h\""
MistOutHTTP: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_http_internal.cpp src/embed.js.h
MistOutHTTP: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_http_internal.cpp src/embed.js.h src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_http_internal.cpp $(LDLIBS) -o $@
outputs: MistOutHSS
MistOutHSS: override LDLIBS += $(THREADLIB)
MistOutHSS: override CPPFLAGS += "-DOUTPUTTYPE=\"output_hss.h\""
MistOutHSS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_hss.cpp
MistOutHSS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_hss.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutHLS
MistOutHLS: override LDLIBS += $(THREADLIB)
MistOutHLS: override CPPFLAGS += -DOUTPUTTYPE=\"output_hls.h\" -DTS_BASECLASS=HTTPOutput
MistOutHLS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_hls.cpp src/output/output_ts_base.cpp
MistOutHLS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_hls.cpp src/output/output_ts_base.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutHDS
MistOutHDS: override LDLIBS += $(THREADLIB)
MistOutHDS: override CPPFLAGS += "-DOUTPUTTYPE=\"output_hds.h\""
MistOutHDS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_hds.cpp
MistOutHDS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_hds.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutSRT
MistOutSRT: override LDLIBS += $(THREADLIB)
MistOutSRT: override CPPFLAGS += "-DOUTPUTTYPE=\"output_srt.h\""
MistOutSRT: src/output/mist_out.cpp src/output/output_http.cpp src/output/output.cpp src/output/output_srt.cpp
MistOutSRT: src/output/mist_out.cpp src/output/output_http.cpp src/output/output.cpp src/output/output_srt.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutJSON
MistOutJSON: override LDLIBS += $(THREADLIB)
MistOutJSON: override CPPFLAGS += "-DOUTPUTTYPE=\"output_json.h\""
MistOutJSON: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_json.cpp
MistOutJSON: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_json.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
lspSOURCES=lsp/plugins/md5.js lsp/plugins/cattablesort.js lsp/mist.js

View file

@ -1029,7 +1029,7 @@ bool DTSC::File::seek_time(unsigned int ms, unsigned int trackNo, bool forceSeek
bool DTSC::File::seek_time(unsigned int ms) {
currentPositions.clear();
if (selectedTracks.size()) {
for (std::set<unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) {
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) {
seek_time(ms, (*it), true);
}
}
@ -1077,7 +1077,7 @@ bool DTSC::File::atKeyframe() {
return false;
}
void DTSC::File::selectTracks(std::set<unsigned int> & tracks) {
void DTSC::File::selectTracks(std::set<unsigned long> & tracks) {
selectedTracks = tracks;
currentPositions.clear();
seek_time(0);

View file

@ -353,7 +353,7 @@ namespace DTSC {
void writePacket(std::string & newPacket);
void writePacket(JSON::Value & newPacket);
bool atKeyframe();
void selectTracks(std::set<unsigned int> & tracks);
void selectTracks(std::set<unsigned long> & tracks);
private:
long int endPos;
void readHeader(int pos);
@ -369,7 +369,7 @@ namespace DTSC {
void * buffer;
bool created;
std::set<seekPos> currentPositions;
std::set<unsigned int> selectedTracks;
std::set<unsigned long> selectedTracks;
};
//FileWriter

View file

@ -85,7 +85,7 @@ bool Controller::sessIndex::operator>= (const Controller::sessIndex &b) const{
/// old statistics that have disconnected over 10 minutes ago.
void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
IPC::sharedServer statServer("statistics", STAT_EX_SIZE, true);
IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true);
while(((Util::Config*)config)->is_active){
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);

View file

@ -59,7 +59,9 @@ namespace Controller {
//push-style stream
if (hasViewers(name)){
data["meta"].null();
IPC::sharedPage streamIndex(name, DEFAULT_META_PAGE_SIZE, false, false);
char streamPageName[NAME_BUFFER_SIZE];
snprintf(streamPageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, name.c_str());
IPC::sharedPage streamIndex(streamPageName, DEFAULT_META_PAGE_SIZE, false, false);
if (!streamIndex.mapped){
return;
}

View file

@ -21,13 +21,14 @@ namespace Mist {
}
}
void Input::doNothing(char * data, size_t len, unsigned int id){
DEBUG_MSG(DLVL_DONTEVEN, "Doing 'nothing'");
void Input::callbackWrapper(char * data, size_t len, unsigned int id){
singleton->userCallback(data, 30, id);//call the userCallback for this input
}
Input::Input(Util::Config * cfg){
Input::Input(Util::Config * cfg) : InOutBase() {
config = cfg;
standAlone = true;
JSON::Value option;
option["long"] = "json";
option["short"] = "j";
@ -97,8 +98,9 @@ namespace Mist {
}
}
int Input::run(){
if (config->getBool("json")){
int Input::run() {
streamName = config->getString("streamname");
if (config->getBool("json")) {
std::cout << capa.toString() << std::endl;
return 0;
}
@ -127,10 +129,10 @@ namespace Mist {
long long int bpos = 0;
seek(0);
getNext();
while (lastPack){
newMeta.updatePosOverride(lastPack, bpos);
file.write(lastPack.getData(), lastPack.getDataLen());
bpos += lastPack.getDataLen();
while (thisPacket){
newMeta.updatePosOverride(thisPacket, bpos);
file.write(thisPacket.getData(), thisPacket.getDataLen());
bpos += thisPacket.getDataLen();
getNext();
}
//close file
@ -143,12 +145,9 @@ namespace Mist {
DEBUG_MSG(DLVL_FAIL,"No filename specified, exiting");
}
}else{
//after this player functionality
metaPage.init(config->getString("streamname"), (isBuffer ? DEFAULT_META_PAGE_SIZE : myMeta.getSendLen()), true);
myMeta.writeTo(metaPage.mapped);
userPage.init(config->getString("streamname") + "_users", PLAY_EX_SIZE, true);
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
userPage.init(userPageName, PLAY_EX_SIZE, true);
if (!isBuffer){
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
bufferFrame(it->first, 1);
@ -161,7 +160,7 @@ namespace Mist {
while ((Util::bootSecs() - activityCounter) < 10){//10 second timeout
Util::wait(1000);
removeUnused();
userPage.parseEach(doNothing);
userPage.parseEach(callbackWrapper);
if (userPage.amount){
activityCounter = Util::bootSecs();
DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount);
@ -169,12 +168,27 @@ namespace Mist {
DEBUG_MSG(DLVL_INSANE, "Timer running");
}
}
finish();
DEBUG_MSG(DLVL_DEVEL,"Closing clean");
//end player functionality
}
return 0;
}
void Input::finish(){
for( std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){
for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){
it2->second = 1;
}
}
removeUnused();
if (standAlone){
for (std::map<unsigned long, IPC::sharedPage>::iterator it = metaPages.begin(); it != metaPages.end(); it++){
it->second.master = true;
}
}
}
void Input::removeUnused(){
for (std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){
for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){
@ -185,12 +199,12 @@ namespace Mist {
change = false;
for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){
if (!it2->second){
dataPages[it->first].erase(it2->first);
bufferRemove(it->first, it2->first);
pageCounter[it->first].erase(it2->first);
for (int i = 0; i < 8192; i += 8){
unsigned int thisKeyNum = ntohl(((((long long int *)(indexPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
unsigned int thisKeyNum = ntohl(((((long long int *)(metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
if (thisKeyNum == it2->first){
(((long long int *)(indexPages[it->first].mapped + i))[0]) = 0;
(((long long int *)(metaPages[it->first].mapped + i))[0]) = 0;
}
}
change = true;
@ -227,10 +241,6 @@ namespace Mist {
}
if (hasKeySizes){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
char tmpId[20];
sprintf(tmpId, "%u", it->first);
DEBUG_MSG(DLVL_HIGH, "Making page %s", std::string(config->getString("streamname") + tmpId).c_str());
indexPages[it->first].init(config->getString("streamname") + tmpId, 8 * 1024, true);//Pages of 8kb in size, room for 512 parts.
bool newData = true;
for (int i = 0; i < it->second.keys.size(); i++){
if (newData){
@ -253,8 +263,8 @@ namespace Mist {
seek(0);
getNext();
while(lastPack){//loop through all
unsigned int tid = lastPack.getTrackId();
while(thisPacket){//loop through all
unsigned int tid = thisPacket.getTrackId();
if (!tid){
getNext(false);
continue;
@ -271,9 +281,6 @@ namespace Mist {
curData[tid].curOffset = 0;
curData[tid].firstTime = myMeta.tracks[tid].keys[0].getTime();
char tmpId[80];
snprintf(tmpId, 80, "%s%u", config->getString("streamname").c_str(), tid);
indexPages[tid].init(tmpId, 8 * 1024, true);//Pages of 8kb in size, room for 512 parts.
}
if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum){
if (curData[tid].dataSize > FLIP_DATA_PAGE_SIZE) {
@ -287,10 +294,10 @@ namespace Mist {
curData[tid].keyNum++;
curData[tid].partNum = 0;
}
curData[tid].dataSize += lastPack.getDataLen();
curData[tid].dataSize += thisPacket.getDataLen();
curData[tid].partNum ++;
bookKeeping[tid].curPart ++;
DEBUG_MSG(DLVL_DONTEVEN, "Track %ld:%llu on page %d@%llu (len:%d), being part %d of key %d", lastPack.getTrackId(), lastPack.getTime(), bookKeeping[tid].first, curData[tid].dataSize, lastPack.getDataLen(), curData[tid].partNum, bookKeeping[tid].first+curData[tid].keyNum);
DEBUG_MSG(DLVL_DONTEVEN, "Track %ld:%llu on page %d@%llu (len:%d), being part %lu of key %lu", thisPacket.getTrackId(), thisPacket.getTime(), bookKeeping[tid].first, curData[tid].dataSize, thisPacket.getDataLen(), curData[tid].partNum, bookKeeping[tid].first+curData[tid].keyNum);
getNext(false);
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
@ -304,8 +311,8 @@ namespace Mist {
DEBUG_MSG(DLVL_WARN, "No pages for track %d found", it->first);
}else{
DEBUG_MSG(DLVL_MEDIUM, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), pagesByTrack[it->first].size());
for (std::map<int, DTSCPageData>::iterator it2 = pagesByTrack[it->first].begin(); it2 != pagesByTrack[it->first].end(); it2++){
DEBUG_MSG(DLVL_VERYHIGH, "Page %u-%u, (%llu bytes)", it2->first, it2->first + it2->second.keyNum - 1, it2->second.dataSize);
for (std::map<unsigned long, DTSCPageData>::iterator it2 = pagesByTrack[it->first].begin(); it2 != pagesByTrack[it->first].end(); it2++){
DEBUG_MSG(DLVL_VERYHIGH, "Page %lu-%lu, (%llu bytes)", it2->first, it2->first + it2->second.keyNum - 1, it2->second.dataSize);
}
}
}
@ -313,79 +320,71 @@ namespace Mist {
bool Input::bufferFrame(unsigned int track, unsigned int keyNum){
if (keyNum >= myMeta.tracks[track].keys.size()){
//End of movie here, returning true to avoid various error messages
return true;
}
if (keyNum < 1){keyNum = 1;}
if (isBuffered(track, keyNum)){
//get corresponding page number
int pageNumber = 0;
for (std::map<unsigned long, DTSCPageData>::iterator it = pagesByTrack[track].begin(); it != pagesByTrack[track].end(); it++){
if (it->first <= keyNum){
pageNumber = it->first;
}else{
break;
}
}
pageCounter[track][pageNumber] = 15;
return true;
}
if (!pagesByTrack.count(track)){
return false;
}
std::map<int, DTSCPageData>::iterator it = pagesByTrack[track].upper_bound(keyNum);
if (it != pagesByTrack[track].begin()){
it--;
//Update keynum to point to the corresponding page
INFO_MSG("Updating keynum %u to %lu", keyNum, (--(pagesByTrack[track].upper_bound(keyNum)))->first);
keyNum = (--(pagesByTrack[track].upper_bound(keyNum)))->first;
if (!bufferStart(track, keyNum)){
return false;
}
unsigned int pageNum = it->first;
pageCounter[track][pageNum] = 15;///Keep page 15seconds in memory after last use
DEBUG_MSG(DLVL_DONTEVEN, "Attempting to buffer page %u key %d->%d", track, keyNum, pageNum);
if (dataPages[track].count(pageNum)){
return true;
}
char pageId[100];
int pageIdLen = snprintf(pageId, 100, "%s%u_%u", config->getString("streamname").c_str(), track, pageNum);
std::string tmpString(pageId, pageIdLen);
dataPages[track][pageNum].init(tmpString, it->second.dataSize, true);
DEBUG_MSG(DLVL_HIGH, "Buffering track %u page %u through %u datasize: %llu", track, pageNum, pageNum-1 + it->second.keyNum, it->second.dataSize);
std::stringstream trackSpec;
trackSpec << track;
trackSelect(trackSpec.str());
unsigned int keyIndex = pageNum-1;
//if (keyIndex > 0){++keyIndex;}
long long unsigned int startTime = myMeta.tracks[track].keys[keyIndex].getTime();
seek(myMeta.tracks[track].keys[keyNum - 1].getTime());
long long unsigned int stopTime = myMeta.tracks[track].lastms + 1;
if ((int)myMeta.tracks[track].keys.size() > keyIndex + it->second.keyNum){
stopTime = myMeta.tracks[track].keys[keyIndex + it->second.keyNum].getTime();
if ((int)myMeta.tracks[track].keys.size() > keyNum - 1 + pagesByTrack[track][keyNum].keyNum){
stopTime = myMeta.tracks[track].keys[keyNum - 1 + pagesByTrack[track][keyNum].keyNum].getTime();
}
DEBUG_MSG(DLVL_HIGH, "Buffering track %d from %d (%llus) to %d (%llus)", track, pageNum, startTime, pageNum-1 + it->second.keyNum, stopTime);
seek(startTime);
it->second.curOffset = 0;
DEBUG_MSG(DLVL_HIGH, "Playing from %llu to %llu", myMeta.tracks[track].keys[keyNum - 1].getTime(), stopTime);
getNext();
//in case earlier seeking was inprecise, seek to the exact point
while (lastPack && lastPack.getTime() < startTime){
while (thisPacket && thisPacket.getTime() < (unsigned long long)myMeta.tracks[track].keys[keyNum - 1].getTime()){
getNext();
}
while (lastPack && lastPack.getTime() < stopTime){
if (it->second.curOffset + lastPack.getDataLen() > pagesByTrack[track][pageNum].dataSize){
DEBUG_MSG(DLVL_WARN, "Trying to write %u bytes on pos %llu where size is %llu (time: %llu / %llu, track %u page %u)", lastPack.getDataLen(), it->second.curOffset, pagesByTrack[track][pageNum].dataSize, lastPack.getTime(), stopTime, track, pageNum);
break;
}else{
// DEBUG_MSG(DLVL_WARN, "Writing %u bytes on pos %llu where size is %llu (time: %llu / %llu, track %u page %u)", lastPack.getDataLen(), it->second.curOffset, pagesByTrack[track][pageNum].dataSize, lastPack.getTime(), stopTime, track, pageNum);
memcpy(dataPages[track][pageNum].mapped + it->second.curOffset, lastPack.getData(), lastPack.getDataLen());
it->second.curOffset += lastPack.getDataLen();
}
while (thisPacket && thisPacket.getTime() < stopTime){
bufferNext(thisPacket);
getNext();
}
for (int i = 0; i < indexPages[track].len / 8; i++){
if (((long long int*)indexPages[track].mapped)[i] == 0){
((long long int*)indexPages[track].mapped)[i] = (((long long int)htonl(pageNum)) << 32) | htonl(it->second.keyNum);
break;
}
}
DEBUG_MSG(DLVL_HIGH, "Done buffering page %u for track %u", pageNum, track);
bufferFinalize(track);
DEBUG_MSG(DLVL_DEVEL, "Done buffering page %d for track %d", keyNum, track);
pageCounter[track][keyNum] = 15;
return true;
}
bool Input::atKeyFrame(){
static std::map<int, unsigned long long> lastSeen;
//not in keyTimes? We're not at a keyframe.
unsigned int c = keyTimes[lastPack.getTrackId()].count(lastPack.getTime());
unsigned int c = keyTimes[thisPacket.getTrackId()].count(thisPacket.getTime());
if (!c){
return false;
}
//skip double times
if (lastSeen.count(lastPack.getTrackId()) && lastSeen[lastPack.getTrackId()] == lastPack.getTime()){
if (lastSeen.count(thisPacket.getTrackId()) && lastSeen[thisPacket.getTrackId()] == thisPacket.getTime()){
return false;
}
//set last seen, and return true
lastSeen[lastPack.getTrackId()] = lastPack.getTime();
lastSeen[thisPacket.getTrackId()] = thisPacket.getTime();
return true;
}

View file

@ -7,35 +7,28 @@
#include <mist/dtsc.h>
#include <mist/shared_memory.h>
namespace Mist {
struct DTSCPageData {
DTSCPageData() : keyNum(0), partNum(0), dataSize(0), curOffset(0), firstTime(0){}
int keyNum;///<The number of keyframes in this page.
int partNum;///<The number of parts in this page.
unsigned long long int dataSize;///<The full size this page should be.
unsigned long long int curOffset;///<The current write offset in the page.
unsigned long long int firstTime;///<The first timestamp of the page.
unsigned long lastKeyTime;///<The last key time encountered on this track.
};
#include "../io.h"
namespace Mist {
struct booking {
int first;
int curKey;
int curPart;
};
class Input {
class Input : public InOutBase {
public:
Input(Util::Config * cfg);
int run();
virtual int run();
virtual ~Input() {};
protected:
static void doNothing(char * data, size_t len, unsigned int id);
static void callbackWrapper(char * data, size_t len, unsigned int id);
virtual bool setup() = 0;
virtual bool readHeader() = 0;
virtual bool atKeyFrame();
virtual void getNext(bool smart = true) {};
virtual void seek(int seekTime){};
virtual void finish();
void play(int until = 0);
void playOnce();
void quitPlay();
@ -53,28 +46,16 @@ namespace Mist {
int playing;
unsigned int playUntil;
unsigned int benchMark;
std::set<unsigned int> selectedTracks;
bool isBuffer;
Util::Config * config;
JSON::Value capa;
DTSC::Meta myMeta;
DTSC::Packet lastPack;
std::map<int,std::set<int> > keyTimes;
IPC::sharedPage metaPage;
//Create server for user pages
IPC::sharedServer userPage;
//TrackIndex pages
std::map<int, IPC::sharedPage> indexPages;
std::map<int, std::map<int, IPC::sharedPage> > dataPages;
//Page Overview
std::map<int, std::map<int, DTSCPageData> > pagesByTrack;
std::map<unsigned int, std::map<unsigned int, unsigned int> > pageCounter;
static Input * singleton;

View file

@ -12,6 +12,10 @@
#include "input_buffer.h"
#ifndef TIMEOUTMULTIPLIER
#define TIMEOUTMULTIPLIER 10
#endif
namespace Mist {
inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg) {
capa["name"] = "Buffer";
@ -37,49 +41,57 @@ namespace Mist {
singleton = this;
bufferTime = 0;
cutTime = 0;
}
inputBuffer::~inputBuffer(){
if (myMeta.tracks.size()){
inputBuffer::~inputBuffer() {
config->is_active = false;
if (myMeta.tracks.size()) {
DEBUG_MSG(DLVL_DEVEL, "Cleaning up, removing last keyframes");
for(std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
while (removeKey(it->first)){}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
while (removeKey(it->first)) {}
}
}
}
void inputBuffer::updateMeta(){
void inputBuffer::updateMeta() {
long long unsigned int firstms = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int lastms = 0;
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.firstms < firstms){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.firstms < firstms) {
firstms = it->second.firstms;
}
if (it->second.firstms > lastms){
if (it->second.firstms > lastms) {
lastms = it->second.lastms;
}
}
myMeta.bufferWindow = lastms - firstms;
myMeta.vod = false;
myMeta.live = true;
IPC::semaphore liveMeta(std::string("liveMeta@" + config->getString("streamname")).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
static char liveSemName[NAME_BUFFER_SIZE];
snprintf(liveSemName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, config->getString("streamname").c_str());
IPC::semaphore liveMeta(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.wait();
myMeta.writeTo(metaPage.mapped);
memset(metaPage.mapped+myMeta.getSendLen(), 0, metaPage.len > myMeta.getSendLen() ? std::min(metaPage.len-myMeta.getSendLen(), 4ll) : 0);
if (!metaPages.count(0) || !metaPages[0].mapped) {
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
metaPages[0].init(pageName, 8 * 1024 * 1024, true);
metaPages[0].master = false;
}
myMeta.writeTo(metaPages[0].mapped);
memset(metaPages[0].mapped + myMeta.getSendLen(), 0, (metaPages[0].len > myMeta.getSendLen() ? std::min(metaPages[0].len - myMeta.getSendLen(), 4ll) : 0));
liveMeta.post();
}
bool inputBuffer::removeKey(unsigned int tid){
if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active){
bool inputBuffer::removeKey(unsigned int tid) {
if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active) {
return false;
}
if (!myMeta.tracks[tid].keys.size()){
if (!myMeta.tracks[tid].keys.size()) {
return false;
}
DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%d", tid, myMeta.tracks[tid].keys[0].getNumber());
DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber());
//remove all parts of this key
for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++){
for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) {
myMeta.tracks[tid].parts.pop_front();
}
//remove the key itself
@ -88,244 +100,378 @@ namespace Mist {
//re-calculate firstms
myMeta.tracks[tid].firstms = myMeta.tracks[tid].keys[0].getTime();
//delete the fragment if it's no longer fully buffered
if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()){
if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()) {
myMeta.tracks[tid].fragments.pop_front();
myMeta.tracks[tid].missedFrags ++;
}
//if there is more than one page buffered for this track...
if (inputLoc[tid].size() > 1){
if (bufferLocations[tid].size() > 1) {
//Check if the first key starts on the second page or higher
if (myMeta.tracks[tid].keys[0].getNumber() >= (++(inputLoc[tid].begin()))->first){
if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active) {
//Find page in indexpage and null it
for (int i = 0; i < 8192; i += 8){
unsigned int thisKeyNum = ntohl(((((long long int *)(indexPages[tid].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
if (thisKeyNum < myMeta.tracks[tid].keys[0].getNumber()){
(((long long int *)(indexPages[tid].mapped + i))[0]) = 0;
for (int i = 0; i < 8192; i += 8) {
unsigned int thisKeyNum = ((((long long int *)(metaPages[tid].mapped + i))[0]) >> 32) & 0xFFFFFFFF;
if (thisKeyNum == htonl(pagesByTrack[tid].begin()->first) && ((((long long int *)(metaPages[tid].mapped + i))[0]) != 0)) {
(((long long int *)(metaPages[tid].mapped + i))[0]) = 0;
}
}
DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, inputLoc[tid].begin()->first, inputLoc[tid].begin()->first + inputLoc[tid].begin()->second.keyNum - 1);
inputLoc[tid].erase(inputLoc[tid].begin());
dataPages[tid].erase(dataPages[tid].begin());
}else{
DEBUG_MSG(DLVL_HIGH, "%d still on first page (%lu - %lu)", myMeta.tracks[tid].keys[0].getNumber(), inputLoc[tid].begin()->first, inputLoc[tid].begin()->first + inputLoc[tid].begin()->second.keyNum - 1);
DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1);
bufferRemove(tid, bufferLocations[tid].begin()->first);
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
int tmpNum = ntohl(tmpOffset[0]);
if (tmpNum == bufferLocations[tid].begin()->first) {
tmpOffset[0] = 0;
tmpOffset[1] = 0;
}
}
curPageNum.erase(tid);
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), (unsigned long)tid, bufferLocations[tid].begin()->first);
curPage[tid].init(thisPageName, 20971520);
curPage[tid].master = true;
curPage.erase(tid);
bufferLocations[tid].erase(bufferLocations[tid].begin());
} else {
DEBUG_MSG(DLVL_HIGH, "%lu still on first page (%lu - %lu)", myMeta.tracks[tid].keys[0].getNumber(), bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1);
}
}
return true;
}
void inputBuffer::removeUnused(){
void inputBuffer::finish() {
Input::finish();
for (std::map<unsigned long, std::map<unsigned long, DTSCPageData> >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++) {
for (std::map<unsigned long, DTSCPageData>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) {
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), it->first, it2->first);
curPage[it->first].init(thisPageName, 20971520, false, false);
curPage[it->first].master = true;
curPage.erase(it->first);
}
}
}
void inputBuffer::removeUnused() {
//first remove all tracks that have not been updated for too long
bool changed = true;
while (changed) {
changed = false;
long long unsigned int time = Util::bootSecs();
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
bool eraseTrack = false;
long long unsigned int compareFirst = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int compareLast = 0;
if ((time - lastUpdated[it->first]) > 5) {
for (std::map<unsigned int, DTSC::Track>::iterator it2 = myMeta.tracks.begin(); it2 != myMeta.tracks.end(); it2++) {
if (it2->first == it->first) {
continue;
}
if ((time - lastUpdated[it2->first]) > 5) {
continue;
}
if (it2->second.lastms > compareLast) {
compareLast = it2->second.lastms;
}
if (it2->second.firstms < compareFirst) {
compareFirst = it2->second.firstms;
}
}
if (compareLast) {
if ((myMeta.tracks[it->first].firstms - compareLast) > ((TIMEOUTMULTIPLIER * bufferTime) / 1000)) {
eraseTrack = true;
}
if ((compareFirst - myMeta.tracks[it->first].lastms) > ((TIMEOUTMULTIPLIER * bufferTime) / 1000)) {
eraseTrack = true;
}
}
}
if ((time - lastUpdated[it->first]) > ((TIMEOUTMULTIPLIER * bufferTime) / 1000)) {
eraseTrack = true;
}
if (eraseTrack) {
//erase this track
INFO_MSG("Erasing track %d because of timeout", it->first);
lastUpdated.erase(it->first);
bufferLocations.erase(it->first);
curPage[it->first].master = true;
curPage.erase(it->first);
curPageNum.erase(it->first);
metaPages[it->first].master = true;
metaPages.erase(it->first);
activeTracks.erase(it->first);
pushLocation.erase(it->first);
myMeta.tracks.erase(it);
changed = true;
break;
}
}
}
//find the earliest video keyframe stored
unsigned int firstVideo = 1;
for(std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.type == "video"){
if (it->second.firstms < firstVideo || firstVideo == 1){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.type == "video") {
if (it->second.firstms < firstVideo || firstVideo == 1) {
firstVideo = it->second.firstms;
}
}
}
for(std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
//non-video tracks need to have a second keyframe that is <= firstVideo
if (it->second.type != "video"){
if (it->second.keys.size() < 2 || it->second.keys[1].getTime() > firstVideo){
if (it->second.type != "video") {
if (it->second.keys.size() < 2 || it->second.keys[1].getTime() > firstVideo) {
continue;
}
}
//Buffer cutting
while(it->second.keys.size() > 1 && it->second.keys[0].getTime() < cutTime){
if (!removeKey(it->first)){break;}
while (it->second.keys.size() > 1 && it->second.keys[0].getTime() < cutTime) {
if (!removeKey(it->first)) {
break;
}
}
//Buffer size management
while(it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime){
if (!removeKey(it->first)){break;}
while (it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime) {
if (!removeKey(it->first)) {
break;
}
}
}
updateMeta();
}
void inputBuffer::userCallback(char * data, size_t len, unsigned int id) {
//Static variable keeping track of the next temporary mapping to use for a track.
static int nextTempId = 1001;
//Get the counter of this user
char counter = (*(data - 1));
for (int index = 0; index < 5; index++){
char* thisData = data + (index * 6);
//Each user can have at maximum 5 elements in their userpage.
for (int index = 0; index < 5; index++) {
char * thisData = data + (index * 6);
//Get the track id from the current element
unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3];
if (value == 0xFFFFFFFF){
//Skip value 0xFFFFFFFF as this indicates a previously declined track
//Skip value 0xFFFFFFFF as this indicates a previously declined track
if (value == 0xFFFFFFFF) {
continue;
}
if (value == 0){
//Skip value 0 as this indicates an empty track
//Skip value 0 as this indicates an empty track
if (value == 0) {
continue;
}
if (pushedLoc[value] == thisData){
if (counter == 126 || counter == 127 || counter == 254 || counter == 255){
pushedLoc.erase(value);
if (negotiateTracks.count(value)){
negotiateTracks.erase(value);
//If the current value indicates a valid trackid, and it is pushed from this user
if (pushLocation[value] == thisData) {
//Check for timeouts, and erase the track if necessary
if (counter == 126 || counter == 127 || counter == 254 || counter == 255) {
pushLocation.erase(value);
if (negotiatingTracks.count(value)) {
negotiatingTracks.erase(value);
metaPages[value].master = true;
metaPages.erase(value);
}
if (data[4] == 0xFF && data[5] == 0xFF && givenTracks.count(value)){
givenTracks.erase(value);
inputLoc.erase(value);
if (data[4] == 0xFF && data[5] == 0xFF && activeTracks.count(value)) {
activeTracks.erase(value);
bufferLocations.erase(value);
}
continue;
}
}
if (value & 0x80000000){
//Track is set to "New track request", assign new track id and create shared memory page
int tmpTid = nextTempId++;
negotiateTracks.insert(tmpTid);
thisData[0] = (tmpTid >> 24) & 0xFF;
thisData[1] = (tmpTid >> 16) & 0xFF;
thisData[2] = (tmpTid >> 8) & 0xFF;
thisData[3] = (tmpTid) & 0xFF;
unsigned long tNum = ((long)(thisData[4]) << 8) | thisData[5];
DEBUG_MSG(DLVL_HIGH, "Assigning temporary ID %d to incoming track %lu for user %d", tmpTid, tNum, id);
char tempMetaName[100];
sprintf(tempMetaName, "liveStream_%s%d", config->getString("streamname").c_str(), tmpTid);
metaPages[tmpTid].init(tempMetaName, DEFAULT_META_PAGE_SIZE, true);
//Track is set to "New track request", assign new track id and create shared memory page
//This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process
if (value & 0x80000000) {
//Set the temporary track id for this item, and increase the temporary value for use with the next track
unsigned long long tempMapping = nextTempId++;
//Add the temporary track id to the list of tracks that are currently being negotiated
negotiatingTracks.insert(tempMapping);
//Write the temporary id to the userpage element
thisData[0] = (tempMapping >> 24) & 0xFF;
thisData[1] = (tempMapping >> 16) & 0xFF;
thisData[2] = (tempMapping >> 8) & 0xFF;
thisData[3] = (tempMapping) & 0xFF;
//Obtain the original track number for the pushing process
unsigned long originalTrack = ((long)(thisData[4]) << 8) | thisData[5];
//Overwrite it with 0xFFFF
thisData[4] = 0xFF;
thisData[5] = 0xFF;
DEBUG_MSG(DLVL_HIGH, "Incoming track %lu from pushing process %d has now been assigned temporary id %llu", originalTrack, id, tempMapping);
}
if (negotiateTracks.count(value)){
//Track is currently under negotiation, check whether the metadata has been submitted
if (metaPages[value].mapped){
unsigned int len = ntohl(((int *)metaPages[value].mapped)[1]);
unsigned int i = 0;
JSON::Value JSONMeta;
JSON::fromDTMI((const unsigned char *)metaPages[value].mapped + 8, len, i, JSONMeta);
DTSC::Meta tmpMeta(JSONMeta);
if (!tmpMeta.tracks.count(value)){//Track not yet added
continue;
}
std::string tempId = tmpMeta.tracks.begin()->second.getIdentifier();
DEBUG_MSG(DLVL_HIGH, "Attempting colision detection for track %s", tempId.c_str());
int finalMap = -1;
if (tmpMeta.tracks.begin()->second.type == "video"){
finalMap = 1;
}
if (tmpMeta.tracks.begin()->second.type == "audio"){
finalMap = 2;
}
//Remove the "negotiate" status in either case
negotiateTracks.erase(value);
metaPages.erase(value);
if (finalMap != -1 && givenTracks.count(finalMap)) {
WARN_MSG("Collision of new track %lu with track %d detected! Declining track", value, finalMap);
thisData[0] = 0xFF;
thisData[1] = 0xFF;
thisData[2] = 0xFF;
thisData[3] = 0xFF;
//The track id is set to the value of a track that we are currently negotiating about
if (negotiatingTracks.count(value)) {
//If the metadata page for this track is not yet registered, initialize it
if (!metaPages.count(value) || !metaPages[value].mapped) {
char tempMetaName[NAME_BUFFER_SIZE];
snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), value);
metaPages[value].init(tempMetaName, 8388608, false, false);
}
//If this tracks metdata page is not initialize, skip the entire element for now. It will be instantiated later
if (!metaPages[value].mapped) {
///\todo Maybe add a timeout counter here, for when we dont expect the track to appear anymore
continue;
}
//The page exist, now we try to read in the metadata of the track
//Store the size of the dtsc packet to read.
unsigned int len = ntohl(((int *)metaPages[value].mapped)[1]);
//Temporary variable, won't be used again
unsigned int tempForReadingMeta = 0;
//Read in the metadata through a temporary JSON object
///\todo Optimize this part. Find a way to not have to store the metadata in JSON first, but read it from the page immediately
JSON::Value tempJSONForMeta;
JSON::fromDTMI((const unsigned char *)metaPages[value].mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
//Construct a metadata object for the current track
DTSC::Meta trackMeta(tempJSONForMeta);
//If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call.
if (!trackMeta.tracks.count(value)) {
continue;
}
std::string trackIdentifier = trackMeta.tracks.find(value)->second.getIdentifier();
DEBUG_MSG(DLVL_HIGH, "Attempting colision detection for track %s", trackIdentifier.c_str());
//Remove the "negotiate" status in either case
negotiatingTracks.erase(value);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
metaPages[value].master = true;
metaPages.erase(value);
int finalMap = (trackMeta.tracks.find(value)->second.type == "video" ? 1 : 2);
//Resume either if we have more than 1 keyframe on the replacement track (assume it was already pushing before the track "dissapeared")
//or if the firstms of the replacement track is later than the lastms on the existing track
if (!myMeta.tracks.count(finalMap) || trackMeta.tracks.find(value)->second.keys.size() > 1 || trackMeta.tracks.find(value)->second.firstms >= myMeta.tracks[finalMap].lastms) {
if (myMeta.tracks.count(finalMap) && myMeta.tracks[finalMap].lastms > 0) {
INFO_MSG("Resume of track %d detected, coming from temporary track %lu of user %u", finalMap, value, id);
} else {
if (finalMap == -1){
WARN_MSG("Invalid track type detected, declining.");
thisData[0] = 0xFF;
thisData[1] = 0xFF;
thisData[2] = 0xFF;
thisData[3] = 0xFF;
continue;
}else{
//Resume either if we have more than 1 keyframe on the replacement track (assume it was already pushing before the track "dissapeared"
//or if the firstms of the replacement track is later than the lastms on the existing track
if (tmpMeta.tracks.begin()->second.keys.size() > 1 || !myMeta.tracks.count(finalMap) || tmpMeta.tracks.begin()->second.firstms >= myMeta.tracks[finalMap].lastms){
if (myMeta.tracks.count(finalMap) && myMeta.tracks[finalMap].lastms > 0){
INFO_MSG("Allowing negotiation track %lu, from user %u, to resume pushing final track number %d", value, id, finalMap);
}else{
INFO_MSG("Allowing negotiation track %lu, from user %u, to start pushing final track number %d", value, id, finalMap);
}
}else{
//Otherwise replace existing track
INFO_MSG("Re-push initiated for track %lu, from user %u, will replace final track number %d", value, id, finalMap);
myMeta.tracks.erase(finalMap);
dataPages.erase(finalMap);
inputLoc.erase(finalMap);
}
}
givenTracks.insert(finalMap);
pushedLoc[finalMap] = thisData;
if (!myMeta.tracks.count(finalMap)){
DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap);
myMeta.tracks[finalMap] = tmpMeta.tracks.begin()->second;
myMeta.tracks[finalMap].trackID = finalMap;
}
thisData[0] = (finalMap >> 24) & 0xFF;
thisData[1] = (finalMap >> 16) & 0xFF;
thisData[2] = (finalMap >> 8) & 0xFF;
thisData[3] = (finalMap) & 0xFF;
int keyNum = myMeta.tracks[finalMap].keys.size();
thisData[4] = (keyNum >> 8) & 0xFF;
thisData[5] = keyNum & 0xFF;
updateMeta();
char firstPage[100];
sprintf(firstPage, "%s%d", config->getString("streamname").c_str(), finalMap);
indexPages[finalMap].init(firstPage, 8192, true);
((long long int *)indexPages[finalMap].mapped)[0] = htonl(1000);
sprintf(firstPage, "%s%d_%d", config->getString("streamname").c_str(), finalMap, keyNum);
///\todo Make size dynamic / other solution. 25mb is too much.
dataPages[finalMap][0].init(firstPage, DEFAULT_DATA_PAGE_SIZE, true);
INFO_MSG("New track detected, assigned track id %d, coming from temporary track %lu of user %u", finalMap, value, id);
}
}
//Register the new track as an active track.
activeTracks.insert(finalMap);
//Register the time of registration as initial value for the lastUpdated field.
lastUpdated[finalMap] = Util::bootSecs();
//Register the user thats is pushing this element
pushLocation[finalMap] = thisData;
//Initialize the metadata for this track if it was not in place yet.
if (!myMeta.tracks.count(finalMap)) {
DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap);
myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second;
myMeta.tracks[finalMap].trackID = finalMap;
}
//Write the final mapped track number to the user page element
thisData[0] = (finalMap >> 24) & 0xFF;
thisData[1] = (finalMap >> 16) & 0xFF;
thisData[2] = (finalMap >> 8) & 0xFF;
thisData[3] = (finalMap) & 0xFF;
//Write the key number to start pushing from to to the userpage element.
//This is used to resume pushing as well as pushing new tracks
unsigned long keyNum = myMeta.tracks[finalMap].keys.size();
thisData[4] = (keyNum >> 8) & 0xFF;
thisData[5] = keyNum & 0xFF;
//Update the metadata to reflect all changes
updateMeta();
}
if (givenTracks.count(value) && pushedLoc[value] == thisData){
//First check if the previous page has been finished:
if (!inputLoc[value].count(dataPages[value].rbegin()->first) || !inputLoc[value][dataPages[value].rbegin()->first].curOffset){
if (dataPages[value].size() > 1){
int previousPage = (++dataPages[value].rbegin())->first;
updateMetaFromPage(value, previousPage);
}
//If the track is active, and this is the element responsible for pushing it
if (activeTracks.count(value) && pushLocation[value] == thisData) {
//Open the track index page if we dont have it open yet
if (!metaPages.count(value) || !metaPages[value].mapped) {
char firstPage[NAME_BUFFER_SIZE];
snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), value);
metaPages[value].init(firstPage, 8192, false, false);
}
//update current page
int currentPage = dataPages[value].rbegin()->first;
updateMetaFromPage(value, currentPage);
if (inputLoc[value][currentPage].curOffset > FLIP_DATA_PAGE_SIZE) {
int nextPage = currentPage + inputLoc[value][currentPage].keyNum;
char nextPageName[100];
sprintf(nextPageName, "%s%lu_%d", config->getString("streamname").c_str(), value, nextPage);
dataPages[value][nextPage].init(nextPageName, DEFAULT_DATA_PAGE_SIZE, true);
DEVEL_MSG("Created page %s, from pos %llu", nextPageName, inputLoc[value][currentPage].curOffset);
bool createdNew = false;
for (int i = 0; i < 8192; i += 8){
unsigned int thisKeyNum = ((((long long int *)(indexPages[value].mapped + i))[0]) >> 32) & 0xFFFFFFFF;
if (thisKeyNum == htonl(currentPage)){
if((ntohl((((long long int*)(indexPages[value].mapped + i))[0]) & 0xFFFFFFFF) == 1000)){
((long long int *)(indexPages[value].mapped + i))[0] &= 0xFFFFFFFF00000000ull;
((long long int *)(indexPages[value].mapped + i))[0] |= htonl(inputLoc[value][currentPage].keyNum);
}
}
if (!createdNew && (((long long int*)(indexPages[value].mapped + i))[0]) == 0){
createdNew = true;
((long long int *)(indexPages[value].mapped + i))[0] = (((long long int)htonl(nextPage)) << 32) | htonl(1000);
}
}
if (!createdNew){
ERROR_MSG("Could not create index for new page - out of empty indexes!");
}
if (metaPages[value].mapped) {
//Update the metadata for this track
updateTrackMeta(value);
}
}
}
}
void inputBuffer::updateMetaFromPage(int tNum, int pageNum){
DTSC::Packet tmpPack;
tmpPack.reInit(dataPages[tNum][pageNum].mapped + inputLoc[tNum][pageNum].curOffset, 0);
if (!tmpPack && inputLoc[tNum][pageNum].curOffset == 0){
return;
}
while (tmpPack) {
myMeta.update(tmpPack);
if (inputLoc[tNum][pageNum].firstTime == 0){
inputLoc[tNum][pageNum].firstTime = tmpPack.getTime();
void inputBuffer::updateTrackMeta(unsigned long tNum) {
//Store a reference for easier access
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[tNum];
//First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8) {
int * tmpOffset = (int *)(metaPages[tNum].mapped + i);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0) {
continue;
}
inputLoc[tNum][pageNum].keyNum += tmpPack.getFlag("keyframe");
inputLoc[tNum][pageNum].curOffset += tmpPack.getDataLen();
tmpPack.reInit(dataPages[tNum][pageNum].mapped + inputLoc[tNum][pageNum].curOffset, 0);
unsigned long keyNum = ntohl(tmpOffset[0]);
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)) {
locations[keyNum].curOffset = 0;
}
locations[keyNum].pageNum = keyNum;
locations[keyNum].keyNum = ntohl(tmpOffset[1]);
}
//Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest
for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) {
updateMetaFromPage(tNum, pageIt->first);
}
updateMeta();
}
void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum) {
DTSCPageData & pageData = bufferLocations[tNum][pageNum];
//If the current page is over its 8mb "splitting" boundary
if (pageData.curOffset > (8 * 1024 * 1024)) {
//And the last keyframe in the parsed metadata is further in the stream than this page
if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()) {
//Assume the entire page is already parsed
return;
}
}
//Otherwise open and parse the page
//Open the page if it is not yet open
if (!curPageNum.count(tNum) || curPageNum[tNum] != pageNum) {
//DO NOT ERASE THE PAGE HERE, master is not set to true
curPageNum.erase(tNum);
char nextPageName[NAME_BUFFER_SIZE];
snprintf(nextPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tNum, pageNum);
curPage[tNum].init(nextPageName, 20971520);
//If the page can not be opened, stop here
if (!curPage[tNum].mapped) {
///\todo Maybe generate a warning here?
return;
}
curPageNum[tNum] = pageNum;
}
DTSC::Packet tmpPack;
tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0);
//No new data has been written on the page since last update
if (!tmpPack) {
return;
}
lastUpdated[tNum] = Util::bootSecs();
while (tmpPack) {
//Update the metadata with this packet
myMeta.update(tmpPack);
//Set the first time when appropriate
if (pageData.firstTime == 0) {
pageData.firstTime = tmpPack.getTime();
}
//Update the offset on the page with the size of the current packet
pageData.curOffset += tmpPack.getDataLen();
//Attempt to read in the next packet
tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0);
}
}
bool inputBuffer::setup() {
std::string strName = config->getString("streamname");
Util::sanitizeName(strName);
strName = strName.substr(0,(strName.find_first_of("+ ")));
strName = strName.substr(0, (strName.find_first_of("+ ")));
IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
@ -333,19 +479,19 @@ namespace Mist {
long long tmpNum;
//if stream is configured and setting is present, use it, always
if (streamCfg && streamCfg.getMember("DVR")){
if (streamCfg && streamCfg.getMember("DVR")) {
tmpNum = streamCfg.getMember("DVR").asInt();
}else{
if (streamCfg){
} else {
if (streamCfg) {
//otherwise, if stream is configured use the default
tmpNum = config->getOption("bufferTime", true)[0u].asInt();
}else{
} else {
//if not, use the commandline argument
tmpNum = config->getOption("bufferTime").asInt();
}
}
//if the new value is different, print a message and apply it
if (bufferTime != tmpNum){
if (bufferTime != tmpNum) {
DEBUG_MSG(DLVL_DEVEL, "Setting bufferTime from %u to new value of %lli", bufferTime, tmpNum);
bufferTime = tmpNum;
}

View file

@ -16,18 +16,20 @@ namespace Mist {
void updateMeta();
bool readHeader();
void getNext(bool smart = true);
void updateMetaFromPage(int tNum, int pageNum);
void updateTrackMeta(unsigned long tNum);
void updateMetaFromPage(unsigned long tNum, unsigned long pageNum);
void seek(int seekTime);
void trackSelect(std::string trackSpec);
bool removeKey(unsigned int tid);
void removeUnused();
void finish();
void userCallback(char * data, size_t len, unsigned int id);
std::set<unsigned long> negotiateTracks;
std::set<unsigned long> givenTracks;
std::map<unsigned long, IPC::sharedPage> metaPages;
std::set<unsigned long> negotiatingTracks;
std::set<unsigned long> activeTracks;
std::map<unsigned long, unsigned long long> lastUpdated;
///Maps trackid to a pagenum->pageData map
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > inputLoc;
std::map<unsigned long, char *> pushedLoc;
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > bufferLocations;
std::map<unsigned long, char *> pushLocation;
inputBuffer * singleton;
};
}

View file

@ -74,7 +74,7 @@ namespace Mist {
}else{
inFile.parseNext();
}
lastPack = inFile.getPacket();
thisPacket = inFile.getPacket();
}
void inputDTSC::seek(int seekTime) {

View file

@ -105,11 +105,11 @@ namespace Mist {
if (FLV::Parse_Error){
FAIL_MSG("FLV error: %s", FLV::Error_Str.c_str());
thisPack.null();
lastPack.null();
thisPacket.null();
return;
}
std::string tmpStr = thisPack.toNetPacked();
lastPack.reInit(tmpStr.data(), tmpStr.size());
thisPacket.reInit(tmpStr.data(), tmpStr.size());
}
void inputFLV::seek(int seekTime) {

View file

@ -86,8 +86,8 @@ namespace Mist {
getNext();
while (lastPack){
myMeta.update(lastPack);
while (thisPacket){
myMeta.update(thisPacket);
getNext();
}
@ -100,7 +100,7 @@ namespace Mist {
}
void inputMP3::getNext(bool smart) {
lastPack.null();
thisPacket.null();
static char packHeader[3000];
size_t filePos = ftell(inFile);
size_t read = fread(packHeader, 1, 3000, inFile);
@ -169,7 +169,7 @@ namespace Mist {
thisPack["time"] = (long long)timestamp;
//Write the json value to lastpack
std::string tmpStr = thisPack.toNetPacked();
lastPack.reInit(tmpStr.data(), tmpStr.size());
thisPacket.reInit(tmpStr.data(), tmpStr.size());
//Update the internal timestamp

View file

@ -217,8 +217,8 @@ namespace Mist {
}
}
getNext();
while (lastPack){
myMeta.update(lastPack);
while (thisPacket){
myMeta.update(thisPacket);
getNext();
}
@ -276,7 +276,7 @@ namespace Mist {
void inputOGG::getNext(bool smart){
if (!currentPositions.size()){
lastPack.null();
thisPacket.null();
return;
}
bool lastCompleteSegment = false;
@ -330,7 +330,7 @@ namespace Mist {
readFullPacket = true;
}
std::string tmpStr = thisSegment.toJSON(oggTracks[thisSegment.tid].codec).toNetPacked();
lastPack.reInit(tmpStr.data(), tmpStr.size());
thisPacket.reInit(tmpStr.data(), tmpStr.size());
if (oggTracks[thisSegment.tid].codec == OGG::VORBIS){
unsigned long blockSize = 0;
@ -351,8 +351,8 @@ namespace Mist {
thisSegment.time = oggTracks[thisSegment.tid].msPerFrame * (parseGranuleUpper + parseGranuleLower - 1);
curPos.time = thisSegment.time;
std::string tmpStr = thisSegment.toJSON(oggTracks[thisSegment.tid].codec).toNetPacked();
lastPack.reInit(tmpStr.data(), tmpStr.size());
// INFO_MSG("thisTime: %d", lastPack.getTime());
thisPacket.reInit(tmpStr.data(), tmpStr.size());
// INFO_MSG("thisTime: %d", thisPacket.getTime());
}
curPos.time += oggTracks[thisSegment.tid].msPerFrame;
}
@ -398,7 +398,7 @@ namespace Mist {
DEBUG_MSG(DLVL_MEDIUM, "Seeking to %dms", seekTime);
//for every track
for (std::set<unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
//find first keyframe before keyframe with ms > seektime
position tmpPos;
tmpPos.trackID = *it;
@ -412,7 +412,7 @@ namespace Mist {
tmpPos.bytepos = ot->getBpos();
}
}
INFO_MSG("Found %dms for track %u at %llu bytepos %llu", seekTime, *it, tmpPos.time, tmpPos.bytepos);
INFO_MSG("Found %dms for track %lu at %llu bytepos %llu", seekTime, *it, tmpPos.time, tmpPos.bytepos);
int backChrs=std::min(280ull, tmpPos.bytepos - 1);
fseek(inFile, tmpPos.bytepos - backChrs, SEEK_SET);
char buffer[300];
@ -422,12 +422,12 @@ namespace Mist {
loc = (char *)memrchr(buffer, 'O', (loc-buffer) -1 );//seek reverse
}
if (!loc){
INFO_MSG("Unable to find a page boundary starting @ %llu, track %u", tmpPos.bytepos, *it);
INFO_MSG("Unable to find a page boundary starting @ %llu, track %lu", tmpPos.bytepos, *it);
continue;
}
tmpPos.segmentNo = backChrs - (loc - buffer);
tmpPos.bytepos -= tmpPos.segmentNo;
INFO_MSG("Track %u, segment %llu found at bytepos %llu", *it, tmpPos.segmentNo, tmpPos.bytepos);
INFO_MSG("Track %lu, segment %llu found at bytepos %llu", *it, tmpPos.segmentNo, tmpPos.bytepos);
currentPositions.insert(tmpPos);
}

515
src/io.cpp Normal file
View file

@ -0,0 +1,515 @@
#include "io.h"
namespace Mist {
Util::Config * InOutBase::config = NULL;
///Opens a shared memory page for the stream metadata.
///
///Assumes myMeta contains the metadata to write.
void InOutBase::initiateMeta() {
//Open the page for the metadata
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
metaPages[0].init(pageName, 8 * 1024 * 1024, true);
//Make sure we don't delete it on accident
metaPages[0].master = false;
//Write the metadata to the page
myMeta.writeTo(metaPages[0].mapped);
}
///Starts the buffering of a new page.
///
///Does not do any actual buffering, just sets the right bits for buffering to go right.
///
///Buffering itself is done by bufferNext().
///\param tid The trackid of the page to start buffering
///\param pageNumber The number of the page to start buffering
bool InOutBase::bufferStart(unsigned long tid, unsigned long pageNumber) {
//Initialize the stream metadata if it does not yet exist
if (!metaPages.count(0)) {
initiateMeta();
}
//If we are a stand-alone player skip track negotiation, as there will be nothing to negotiate with.
if (standAlone) {
if (!trackMap.count(tid)) {
trackMap[tid] = tid;
}
}
//Negotiate the requested track if needed.
continueNegotiate(tid);
//If the negotation state for this track is not 'Accepted', stop buffering this page, maybe try again later.
if (trackState[tid] != FILL_ACC) {
///\return false if the track has not been accepted (yet)
return false;
}
//If the track is accepted, we will have a mapped tid
unsigned long mapTid = trackMap[tid];
//If we are currently buffering a page, abandon it completely and print a message about this
//This page will NEVER be deleted, unless we open it again later.
if (curPage.count(tid)) {
WARN_MSG("Abandoning current page (%lu) for track %lu~>%lu", curPageNum[tid], tid, mapTid);
curPage.erase(tid);
curPageNum.erase(tid);
}
//If this is not a valid page number on this track, stop buffering this page.
if (!pagesByTrack[tid].count(pageNumber)){
INFO_MSG("Aborting page buffer start: %lu is not a valid page number on track %lu~>%lu.", pageNumber, tid, mapTid);
std::stringstream test;
for (std::map<unsigned long, DTSCPageData>::iterator it = pagesByTrack[tid].begin(); it != pagesByTrack[tid].end(); it++){
test << it->first << " ";
}
INFO_MSG("%s are", test.str().c_str());
///\return false if the pagenumber is not valid for this track
return false;
}
//If the page is already buffered, ignore this request
if (isBuffered(tid, pageNumber)) {
INFO_MSG("Page %lu on track %lu~>%lu already buffered", pageNumber, tid, mapTid);
///\return false if the page was already buffered.
return false;
}
//Open the correct page for the data
char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), mapTid, pageNumber);
std::string pageName(pageId);
#ifdef __CYGWIN__
curPage[tid].init(pageName, 26 * 1024 * 1024, true);
#else
curPage[tid].init(pageName, pagesByTrack[tid][pageNumber].dataSize, true);
#endif
//Make sure the data page is not destroyed when we are done buffering it later on.
curPage[tid].master = false;
//Store the pagenumber of the currently buffer page
curPageNum[tid] = pageNumber;
//Initialize the bookkeeping entry, and set the current offset to 0, to allow for using it in bufferNext()
pagesByTrack[tid][pageNumber].curOffset = 0;
//Register this page on the meta page
bool inserted = false;
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
if ((tmpOffset[0] == 0 && tmpOffset[1] == 0)) {
tmpOffset[0] = htonl(curPageNum[tid]);
if (pagesByTrack[tid][pageNumber].dataSize == (25 * 1024 * 1024)){
tmpOffset[1] = htonl(1000);
} else {
tmpOffset[1] = htonl(pagesByTrack[tid][pageNumber].keyNum);
}
inserted = true;
break;
}
}
INFO_MSG("Start buffering page %lu on track %lu~>%lu successful", pageNumber, tid, mapTid);
///\return true if everything was successful
return true;
}
///Removes a fully buffered page
///
///Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function.
///\param tid The trackid to remove the page from
///\param pageNumber The number of the page to remove
void InOutBase::bufferRemove(unsigned long tid, unsigned long pageNumber) {
if (!standAlone) {
//A different process will handle this for us
return;
}
//Do nothing if the page is not buffered
if (!isBuffered(tid, pageNumber)) {
return;
}
unsigned long mapTid = trackMap[tid];
//If the given pagenumber is not a valid page on this track, do nothing
if (!pagesByTrack[tid].count(pageNumber)){
INFO_MSG("Can't remove page %lu on track %lu~>%lu as it is not a valid page number.", pageNumber, tid, mapTid);
return;
}
//Open the correct page
char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), mapTid, pageNumber);
std::string pageName(pageId);
IPC::sharedPage toErase;
#ifdef __CYGWIN__
toErase.init(pageName, 26 * 1024 * 1024, false);
#else
toErase.init(pageName, pagesByTrack[tid][pageNumber].dataSize, false);
#endif
//Set the master flag so that the page will be destoryed once it leaves scope
toErase.master = true;
//Remove the page from the tracks index page
DEBUG_MSG(DLVL_HIGH, "Removing page %lu on track %lu~>%lu from the corresponding metaPage", pageNumber, tid, mapTid);
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
if (ntohl(tmpOffset[0]) == pageNumber) {
tmpOffset[0] = 0;
tmpOffset[1] = 0;
}
}
//Leaving scope here, the page will now be destroyed
}
///Checks whether a key is buffered
///\param tid The trackid on which to locate the key
///\param keyNum The number of the keyframe to find
bool InOutBase::isBuffered(unsigned long tid, unsigned long keyNum) {
///\return The result of bufferedOnPage(tid, keyNum)
return bufferedOnPage(tid, keyNum);
}
///Returns the pagenumber where this key is buffered on
///\param tid The trackid on which to locate the key
///\param keyNum The number of the keyframe to find
unsigned long InOutBase::bufferedOnPage(unsigned long tid, unsigned long keyNum) {
//Check whether the track is accepted
if (!trackMap.count(tid) || !metaPages.count(tid) || !metaPages[tid].mapped) {
///\return 0 if the page has not been mapped yet
return 0;
}
//Loop over the index page
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
int pageNum = ntohl(tmpOffset[0]);
int keyAmount = ntohl(tmpOffset[1]);
//Check whether the key is on this page
if (pageNum <= keyNum && keyNum < pageNum + keyAmount) {
///\return The pagenumber of the page the key is located on, if the page is registered on the track index page
return pageNum;
}
}
///\return 0 if the key was not found
return 0;
}
///Buffers the next packet on the currently opened page
///\param pack The packet to buffer
void InOutBase::bufferNext(JSON::Value & pack) {
std::string packData = pack.toNetPacked();
DTSC::Packet newPack(packData.data(), packData.size());
///\note Internally calls bufferNext(DTSC::Packet & pack)
bufferNext(newPack);
}
///Buffers the next packet on the currently opened page
///\param pack The packet to buffer
void InOutBase::bufferNext(DTSC::Packet & pack) {
//Save the trackid of the track for easier access
unsigned long tid = pack.getTrackId();
unsigned long mapTid = trackMap[tid];
//Do nothing if no page is opened for this track
if (!curPage.count(tid)) {
INFO_MSG("Trying to buffer a packet on track %lu~>%lu, but no page is initialized", tid, mapTid);
return;
}
//Save the current write position
size_t curOffset = pagesByTrack[tid][curPageNum[tid]].curOffset;
//Do nothing when there is not enough free space on the page to add the packet.
if (pagesByTrack[tid][curPageNum[tid]].dataSize - curOffset < pack.getDataLen()) {
INFO_MSG("Trying to buffer a packet on page %lu for track %lu~>%lu, but we have a size mismatch", curPageNum[tid], tid, mapTid);
return;
}
//Brain melt starts here
//First memcpy only the payload to the destination
//Leaves the 20 bytes inbetween empty to ensure the data is not accidentally read before it is complete
memcpy(curPage[tid].mapped + curOffset + 20, pack.getData() + 20, pack.getDataLen() - 20);
//Copy the remaing values in reverse order:
//8 byte timestamp
memcpy(curPage[tid].mapped + curOffset + 12, pack.getData() + 12, 8);
//The mapped track id
((int *)(curPage[tid].mapped + curOffset + 8))[0] = htonl(mapTid);
//Write the size and 'DTP2' bytes to conclude the packet and allow for reading it
memcpy(curPage[tid].mapped + curOffset, pack.getData(), 8);
if (myMeta.live){
//Update the metadata
DTSC::Packet updatePack(curPage[tid].mapped + curOffset, pack.getDataLen(), true);
myMeta.update(updatePack);
}
//End of brain melt
pagesByTrack[tid][curPageNum[tid]].curOffset += pack.getDataLen();
}
///Wraps up the buffering of a shared memory data page
///
///Registers the data page on the track index page as well
///\param tid The trackid of the page to finalize
void InOutBase::bufferFinalize(unsigned long tid) {
unsigned long mapTid = trackMap[tid];
//If no page is open, do nothing
if (!curPage.count(tid)) {
INFO_MSG("Trying to finalize the current page on track %lu~>%lu, but no page is initialized", tid, mapTid);
return;
}
//Keep track of registering the page on the track's index page
bool inserted = false;
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
int keyNum = ntohl(tmpOffset[0]);
int keyAmount = ntohl(tmpOffset[1]);
if (keyNum == curPageNum[tid]){
if (keyAmount == 1000){
tmpOffset[1] = htonl(pagesByTrack[tid][curPageNum[tid]].keyNum);
}
inserted = true;
break;
}
}
//Print a message about registering the page or not.
if (!inserted) {
INFO_MSG("Can't register page %lu on the metaPage of track %lu~>%lu, No empty spots left within 'should be' amount of slots", curPageNum[tid], tid, mapTid);
} else {
INFO_MSG("Succesfully registered page %lu on the metaPage of track %lu~>%lu.", curPageNum[tid], tid, mapTid);
}
//Close our link to the page. This will NOT destroy the shared page, as we've set master to false upon construction
curPage.erase(tid);
curPageNum.erase(tid);
}
///Buffers a live packet to a page.
///
///Handles both buffering and creation of new pages
///
///Initiates/continues negotiation with the buffer as well
///\param packet The packet to buffer
void InOutBase::bufferLivePacket(JSON::Value & packet) {
//Store the trackid for easier access
unsigned long tid = packet["trackid"].asInt();
//Do nothing if the trackid is invalid
if (!tid) {
INFO_MSG("Packet without trackid");
return;
}
//If the track is not negotiated yet, start the negotiation
if (!trackState.count(tid)) {
continueNegotiate(tid);
}
//If the track is declined, stop here
if (trackState[tid] == FILL_DEC) {
INFO_MSG("Track %lu Declined", tid);
return;
}
//Check if a different track is already accepted
bool shouldBlock = true;
if (pagesByTrack.count(tid) && pagesByTrack[tid].size()) {
for (std::map<unsigned long, negotiationState>::iterator it = trackState.begin(); it != trackState.end(); it++) {
if (it->second == FILL_ACC) {
//If so, we do not block here
shouldBlock = false;
}
}
}
//Block if no tracks are accepted yet, until we have a definite state
if (shouldBlock) {
while (trackState[tid] != FILL_DEC && trackState[tid] != FILL_ACC) {
INFO_MSG("Blocking on track %lu", tid);
continueNegotiate(tid);
Util::sleep(500);
}
}
//This update needs to happen whether the track is accepted or not.
///\todo Figure out how to act with declined track here
bool isKeyframe = false;
if (myMeta.tracks[tid].type == "video") {
if (packet.isMember("keyframe") && packet["keyframe"]) {
isKeyframe = true;
}
} else {
if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0) {
//Assume this is the first packet on the track
isKeyframe = true;
} else {
unsigned long lastKey = pagesByTrack[tid].rbegin()->second.lastKeyTime;
if (packet["time"].asInt() - lastKey > 5000) {
isKeyframe = true;
}
}
}
//Determine if we need to open the next page
int nextPageNum = -1;
if (isKeyframe) {
//If there is no page, create it
if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0) {
nextPageNum = 1;
pagesByTrack[tid][1].dataSize = (25 * 1024 * 1024);//Initialize op 25mb
pagesByTrack[tid][1].pageNum = 1;
}
//Take the last allocated page
std::map<unsigned long, DTSCPageData>::reverse_iterator tmpIt = pagesByTrack[tid].rbegin();
//Compare on 8 mb boundary
if (tmpIt->second.curOffset > (8 * 1024 * 1024)) {
//Create the book keeping data for the new page
nextPageNum = tmpIt->second.pageNum + tmpIt->second.keyNum;
INFO_MSG("We should go to next page now, transition from %lu to %d", tmpIt->second.pageNum, nextPageNum);
pagesByTrack[tid][nextPageNum].dataSize = (25 * 1024 * 1024);
pagesByTrack[tid][nextPageNum].pageNum = nextPageNum;
}
pagesByTrack[tid].rbegin()->second.lastKeyTime = packet["time"].asInt();
pagesByTrack[tid].rbegin()->second.keyNum++;
}
//Set the pageNumber if it has not been set yet
if (nextPageNum == -1) {
if (curPageNum.count(tid)) {
nextPageNum = curPageNum[tid];
}else{
nextPageNum = 1;
}
}
//At this point we can stop parsing when the track is not accepted
if (trackState[tid] != FILL_ACC) {
return;
}
//Check if the correct page is opened
if (!curPageNum.count(tid) || nextPageNum != curPageNum[tid]) {
if (curPageNum.count(tid)) {
//Close the currently opened page when it exists
bufferFinalize(tid);
}
//Open the new page
bufferStart(tid, nextPageNum);
}
//Buffer the packet
bufferNext(packet);
}
void InOutBase::continueNegotiate(unsigned long tid) {
if (!tid) {
return;
}
userClient.keepAlive();
if (trackMap.count(tid) && !trackState.count(tid)) {
//If the trackmap has been set manually, don't negotiate
INFO_MSG("Manually Set TrackMap");
trackState[tid] = FILL_ACC;
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), tid);
metaPages[tid].init(pageName, 8 * 1024 * 1024, true);
metaPages[tid].master = false;
return;
}
if (trackState.count(tid) && (trackState[tid] == FILL_DEC || trackState[tid] == FILL_ACC)) {
HIGH_MSG("Do Not Renegotiate");
//dont try to re-negoiate existing tracks, if this is what you want, remove the tid from the trackState before calling this function
return;
}
if (!trackOffset.count(tid)) {
if (trackOffset.size() >= 5) {
INFO_MSG("Trackoffset too high");
return;
}
//Find a free offset for the new track
for (int i = 0; i < 5; i++) {
bool isFree = true;
for (std::map<unsigned long, unsigned long>::iterator it = trackOffset.begin(); it != trackOffset.end(); it++) {
if (it->second == i) {
isFree = false;
break;
}
}
if (isFree) {
trackOffset[tid] = i;
break;
}
}
}
//Now we either returned or the track has an offset for the user page.
//Get the data from the userPage
char * tmp = userClient.getData();
if (!tmp) {
DEBUG_MSG(DLVL_FAIL, "Failed to negotiate for incoming track %lu, there does not seem to be a connection with the buffer", tid);
return;
}
unsigned long offset = 6 * trackOffset[tid];
//If we have a new track to negotiate
if (!trackState.count(tid)) {
INFO_MSG("Starting negotiation for incoming track %lu, at offset %lu", tid, trackOffset[tid]);
memset(tmp + offset, 0, 4);
tmp[offset] = 0x80;
tmp[offset + 4] = ((tid >> 8) & 0xFF);
tmp[offset + 5] = (tid & 0xFF);
trackState[tid] = FILL_NEW;
return;
}
switch (trackState[tid]) {
case FILL_NEW: {
unsigned long newTid = ((long)(tmp[offset]) << 24) | ((long)(tmp[offset + 1]) << 16) | ((long)(tmp[offset + 2]) << 8) | tmp[offset + 3];
INSANE_MSG("NewTid: %0.8lX", newTid);
if (newTid == 0x80000000u) {
INSANE_MSG("Breaking because not set yet");
break;
}
INFO_MSG("Track %lu temporarily mapped to %lu", tid, newTid);
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), newTid);
metaPages[tid].init(pageName, 8 * 1024 * 1024, true);
metaPages[tid].master = false;
DTSC::Meta tmpMeta;
tmpMeta.tracks[tid] = myMeta.tracks[tid];
tmpMeta.tracks[tid].trackID = newTid;
JSON::Value tmpVal = tmpMeta.toJSON();
std::string tmpStr = tmpVal.toNetPacked();
memcpy(metaPages[tid].mapped, tmpStr.data(), tmpStr.size());
INFO_MSG("Temporary metadata written for incoming track %lu, handling as track %lu", tid, newTid);
//Not actually removing the page, because we set master to false
metaPages.erase(tid);
trackState[tid] = FILL_NEG;
trackMap[tid] = newTid;
break;
}
case FILL_NEG: {
unsigned long finalTid = ((long)(tmp[offset]) << 24) | ((long)(tmp[offset + 1]) << 16) | ((long)(tmp[offset + 2]) << 8) | tmp[offset + 3];
unsigned long firstPage = firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5];
if (firstPage == 0xFFFF) {
INFO_MSG("Negotiating, but firstPage not yet set, waiting for buffer");
break;
}
if (finalTid == 0xFFFFFFFF) {
WARN_MSG("Buffer has declined incoming track %lu", tid);
memset(tmp + offset, 0, 6);
trackState[tid] = FILL_DEC;
trackMap.erase(tid);
break;
}
//Reinitialize so we can be sure we got the right values here
finalTid = ((long)(tmp[offset]) << 24) | ((long)(tmp[offset + 1]) << 16) | ((long)(tmp[offset + 2]) << 8) | tmp[offset + 3];
firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5];
if (finalTid == 0xFFFFFFFF) {
WARN_MSG("Buffer has declined incoming track %lu", tid);
memset(tmp + offset, 0, 6);
trackState[tid] = FILL_DEC;
trackMap.erase(tid);
break;
}
INFO_MSG("Buffer has indicated that incoming track %lu should start writing on track %lu, page %lu", tid, finalTid, firstPage);
trackMap[tid] = finalTid;
trackState[tid] = FILL_ACC;
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), finalTid);
metaPages[tid].init(pageName, 8 * 1024 * 1024, true);
metaPages[tid].master = false;
break;
}
default:
//We can't get here because we catch this case in the beginning of the function,
//this case surpresses a compiler warning
break;
}
}
}

68
src/io.h Normal file
View file

@ -0,0 +1,68 @@
#pragma once
#include <map>
#include <deque>
#include <mist/shared_memory.h>
#include <mist/defines.h>
#include <mist/dtsc.h>
namespace Mist {
enum negotiationState {
FILL_NEW,///< New track, just sent negotiation request
FILL_NEG,///< Negotiating this track, written metadata
FILL_DEC,///< Declined Track
FILL_ACC///< Accepted Track
};
struct DTSCPageData {
DTSCPageData() : pageNum(0), keyNum(0), partNum(0), dataSize(0), curOffset(0), firstTime(0), lastKeyTime(-5000){}
unsigned long pageNum;///The current page number
unsigned long keyNum;///<The number of keyframes in this page.
unsigned long partNum;///<The number of parts in this page.
unsigned long long int dataSize;///<The full size this page should be.
unsigned long long int curOffset;///<The current write offset in the page.
unsigned long long int firstTime;///<The first timestamp of the page.
unsigned long lastKeyTime;///<The last key time encountered on this track.
};
///\brief Class containing all basic input and output functions.
class InOutBase {
public:
void initiateMeta();
bool bufferStart(unsigned long tid, unsigned long pageNumber);
void bufferNext(DTSC::Packet & pack);
void bufferNext(JSON::Value & pack);
void bufferFinalize(unsigned long tid);
void bufferRemove(unsigned long tid, unsigned long pageNumber);
void bufferLivePacket(JSON::Value & packet);
bool isBuffered(unsigned long tid, unsigned long keyNum);
unsigned long bufferedOnPage(unsigned long tid, unsigned long keyNum);
protected:
bool standAlone;
static Util::Config * config;
void continueNegotiate(unsigned long tid);
DTSC::Packet thisPacket;//The current packet that is being parsed
std::string streamName;///< Name of the stream to connect to
IPC::sharedClient userClient;///< Shared memory used for connection to Mixer process.
DTSC::Meta myMeta;///< Stores either the input or output metadata
std::set<unsigned long> selectedTracks;///< Stores the track id's that are either selected for playback or input
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > pagesByTrack;///<Holds the data for all pages of a track. Based on unmapped tids
//Negotiation stuff (from unmapped tid's)
std::map<unsigned long, unsigned long> trackOffset; ///< Offset of data on user page
std::map<unsigned long, negotiationState> trackState; ///< State of the negotiation for tracks
std::map<unsigned long, unsigned long> trackMap;///<Determines which input track maps onto which "final" track
//Using mapped tid's
std::map<unsigned long, IPC::sharedPage> metaPages;///< For each track, holds the page that describes which dataPages are mapped
std::map<unsigned long, unsigned long> curPageNum;///< For each track, holds the number page that is currently being written.
std::map<unsigned long, IPC::sharedPage> curPage;///< For each track, holds the page that is currently being written.
std::map<unsigned long, std::deque<DTSC::Packet> > trackBuffer; ///< Buffer to be used during active track negotiation
};
}

View file

@ -13,7 +13,6 @@
#include "output.h"
namespace Mist {
Util::Config * Output::config = NULL;
JSON::Value Output::capa = JSON::Value();
int getDTSCLen(char * mapped, long long int offset){
@ -61,13 +60,15 @@ namespace Mist {
void Output::updateMeta(){
//read metadata from page to myMeta variable
IPC::semaphore liveMeta(std::string("liveMeta@" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
static char liveSemName[NAME_BUFFER_SIZE];
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore liveMeta(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
bool lock = myMeta.live;
if (lock){
liveMeta.wait();
}
if (streamIndex.mapped){
DTSC::Packet tmpMeta(streamIndex.mapped, streamIndex.len, true);
if (metaPages[0].mapped){
DTSC::Packet tmpMeta(metaPages[0].mapped, metaPages[0].len, true);
if (tmpMeta.getVersion()){
myMeta.reinit(tmpMeta);
}
@ -85,150 +86,11 @@ namespace Mist {
myConn.close();
}
void Output::negotiateWithBuffer(int tid){
//Check whether the track exists
if (!meta_out.tracks.count(tid)) {
return;
}
//Do not re-negotiate already confirmed tracks
if (trackMap.count(tid)){
return;
}
//Do not re-negotiate if already at maximum for push tracks
if (trackMap.size() >= 5){
DEBUG_MSG(DLVL_FAIL, "Failed to negotiate for incoming track %d, already at maximum number of tracks", tid);
return;
}
char * tmp = playerConn.getData();
if (!tmp){
DEBUG_MSG(DLVL_FAIL, "Failed to negotiate for incoming track %d, there does not seem to be a connection with the buffer", tid);
return;
}
int bufConnOffset = trackMap.size();
DEBUG_MSG(DLVL_DEVEL, "Starting negotiation for incoming track %d, at offset %d", tid, bufConnOffset);
memset(tmp + 6 * bufConnOffset, 0, 4);
tmp[6 * bufConnOffset] = 0x80;
tmp[6 * bufConnOffset + 4] = 0xFF;
tmp[6 * bufConnOffset + 5] = 0xFF;
playerConn.keepAlive();
unsigned int newTid = 0x80000000u;
while (newTid == 0x80000000u){
Util::sleep(100);
newTid = ((long)(tmp[6 * bufConnOffset]) << 24) | ((long)(tmp[6 * bufConnOffset + 1]) << 16) | ((long)(tmp[6 * bufConnOffset + 2]) << 8) | tmp[6 * bufConnOffset + 3];
}
DEBUG_MSG(DLVL_VERYHIGH, "Track %d temporarily mapped to %d", tid, newTid);
char pageName[100];
sprintf(pageName, "liveStream_%s%d", streamName.c_str(), newTid);
IPC::sharedPage metaPage(pageName, 8 * 1024 * 1024);
DTSC::Meta tmpMeta = meta_out;
tmpMeta.tracks.clear();
tmpMeta.tracks[newTid] = meta_out.tracks[tid];
tmpMeta.tracks[newTid].trackID = newTid;
JSON::Value tmpVal = tmpMeta.toJSON();
std::string tmpStr = tmpVal.toNetPacked();
memcpy(metaPage.mapped, tmpStr.data(), tmpStr.size());
DEBUG_MSG(DLVL_VERYHIGH, "Temporary metadata written for incoming track %d, handling as track %d", tid, newTid);
unsigned short firstPage = 0xFFFF;
unsigned int finalTid = newTid;
while (firstPage == 0xFFFF){
DEBUG_MSG(DLVL_VERYHIGH, "Re-checking at offset %d", bufConnOffset);
Util::sleep(100);
finalTid = ((long)(tmp[6 * bufConnOffset]) << 24) | ((long)(tmp[6 * bufConnOffset + 1]) << 16) | ((long)(tmp[6 * bufConnOffset + 2]) << 8) | tmp[6 * bufConnOffset + 3];
firstPage = ((long)(tmp[6 * bufConnOffset + 4]) << 8) | tmp[6 * bufConnOffset + 5];
if (finalTid == 0xFFFFFFFF){
WARN_MSG("Buffer has declined incoming track %d", tid);
return;
}
}
//Reinitialize so we make sure we got the right values here
finalTid = ((long)(tmp[6 * bufConnOffset]) << 24) | ((long)(tmp[6 * bufConnOffset + 1]) << 16) | ((long)(tmp[6 * bufConnOffset + 2]) << 8) | tmp[6 * bufConnOffset + 3];
firstPage = ((long)(tmp[6 * bufConnOffset + 4]) << 8) | tmp[6 * bufConnOffset + 5];
if (finalTid == 0xFFFFFFFF){
WARN_MSG("Buffer has declined incoming track %d", tid);
memset(tmp + 6 * bufConnOffset, 0, 6);
return;
}
INFO_MSG("Buffer has indicated that incoming track %d should start writing on track %d, page %d", tid, finalTid, firstPage);
memset(pageName, 0, 100);
sprintf(pageName, "%s%d_%d", streamName.c_str(), finalTid, firstPage);
curPages[finalTid].init(pageName, DEFAULT_DATA_PAGE_SIZE);
trackMap[tid] = finalTid;
bookKeeping[finalTid] = DTSCPageData();
}
void Output::negotiatePushTracks() {
int i = 0;
for (std::map<unsigned int, DTSC::Track>::iterator it = meta_out.tracks.begin(); it != meta_out.tracks.end() && i < 5; it++){
negotiateWithBuffer(it->first);
i++;
}
}
void Output::bufferPacket(JSON::Value & pack){
if (!pack["trackid"].asInt()){return;}
if (myMeta.tracks[pack["trackid"].asInt()].type != "video"){
if ((pack["time"].asInt() - bookKeeping[trackMap[pack["trackid"].asInt()]].lastKeyTime) >= 5000){
pack["keyframe"] = 1LL;
bookKeeping[trackMap[pack["trackid"].asInt()]].lastKeyTime = pack["time"].asInt();
}
}
if (pack["trackid"].asInt() == 0){
return;
}
//Re-negotiate declined tracks on each keyframe, to compensate for "broken" tracks
if (!trackMap.count(pack["trackid"].asInt()) || !trackMap[pack["trackid"].asInt()]){
if (pack.isMember("keyframe") && pack["keyframe"]){
negotiateWithBuffer(pack["trackid"].asInt());
}
}
if (!trackMap.count(pack["trackid"].asInt()) || !trackMap[pack["trackid"].asInt()]){
//declined track;
return;
}
pack["trackid"] = trackMap[pack["trackid"].asInt()];
long long unsigned int tNum = pack["trackid"].asInt();
if (!bookKeeping.count(tNum)){
return;
}
int pageNum = bookKeeping[tNum].pageNum;
std::string tmp = pack.toNetPacked();
if (bookKeeping[tNum].curOffset > FLIP_DATA_PAGE_SIZE && pack.isMember("keyframe") && pack["keyframe"]){
//open new page
char nextPage[100];
sprintf(nextPage, "%s%llu_%d", streamName.c_str(), tNum, bookKeeping[tNum].pageNum + bookKeeping[tNum].keyNum);
INFO_MSG("Continuing track %llu on page %d, from pos %llu", tNum, bookKeeping[tNum].pageNum + bookKeeping[tNum].keyNum, bookKeeping[tNum].curOffset);
curPages[tNum].init(nextPage, DEFAULT_DATA_PAGE_SIZE);
bookKeeping[tNum].pageNum += bookKeeping[tNum].keyNum;
bookKeeping[tNum].keyNum = 0;
bookKeeping[tNum].curOffset = 0;
}
if (!curPages[tNum].mapped){
//prevent page init failures from crashing everything.
myConn.close();//closes the connection to trigger a clean shutdown
return;
}
if (bookKeeping[tNum].curOffset + tmp.size() < (unsigned long long)curPages[tNum].len){
bookKeeping[tNum].keyNum += (pack.isMember("keyframe") && pack["keyframe"]);
memcpy(curPages[tNum].mapped + bookKeeping[tNum].curOffset, tmp.data(), tmp.size());
bookKeeping[tNum].curOffset += tmp.size();
}else{
bookKeeping[tNum].curOffset += tmp.size();
DEBUG_MSG(DLVL_WARN, "Can't buffer frame on page %d, track %llu, time %lld, keyNum %d, offset %llu", pageNum, tNum, pack["time"].asInt(), bookKeeping[tNum].pageNum + bookKeeping[tNum].keyNum, bookKeeping[tNum].curOffset);
}
}
void Output::initialize(){
if (isInitialized){
return;
}
if (streamIndex.mapped){
if (metaPages[0].mapped){
return;
}
if (streamName.size() < 1){
@ -240,14 +102,20 @@ namespace Mist {
return;
}
isInitialized = true;
streamIndex.init(streamName, DEFAULT_META_PAGE_SIZE);
if (!streamIndex.mapped){
char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
metaPages[0].init(pageId, DEFAULT_META_PAGE_SIZE);
if (!metaPages[0].mapped){
DEBUG_MSG(DLVL_FAIL, "Could not connect to server for %s\n", streamName.c_str());
onFail();
return;
}
statsPage = IPC::sharedClient("statistics", STAT_EX_SIZE, true);
playerConn = IPC::sharedClient(streamName + "_users", PLAY_EX_SIZE , true);
statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true);
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
if (!userClient.getData()){
userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
}
updateMeta();
selectDefaultTracks();
sought = false;
@ -259,14 +127,14 @@ namespace Mist {
return;
}
//check which tracks don't actually exist
std::set<long unsigned int> toRemove;
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
std::set<unsigned long> toRemove;
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (!myMeta.tracks.count(*it)){
toRemove.insert(*it);
}
}
//remove those from selectedtracks
for (std::set<long unsigned int>::iterator it = toRemove.begin(); it != toRemove.end(); it++){
for (std::set<unsigned long>::iterator it = toRemove.begin(); it != toRemove.end(); it++){
selectedTracks.erase(*it);
}
@ -282,7 +150,7 @@ namespace Mist {
if ((*itb).size() > 0){
bool found = false;
for (JSON::ArrIter itc = (*itb).ArrBegin(); itc != (*itb).ArrEnd() && !found; itc++){
for (std::set<long unsigned int>::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){
for (std::set<unsigned long>::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){
if (myMeta.tracks[*itd].codec == (*itc).asStringRef()){
selCounter++;
found = true;
@ -321,12 +189,10 @@ namespace Mist {
if ((*itb).size() && myMeta.tracks.size()){
bool found = false;
for (JSON::ArrIter itc = (*itb).ArrBegin(); itc != (*itb).ArrEnd() && !found; itc++){
if (selectedTracks.size()){
for (std::set<long unsigned int>::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){
if (myMeta.tracks[*itd].codec == (*itc).asStringRef()){
found = true;
break;
}
for (std::set<unsigned long>::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){
if (myMeta.tracks[*itd].codec == (*itc).asStringRef()){
found = true;
break;
}
}
if (!found){
@ -385,17 +251,17 @@ namespace Mist {
}
int Output::pageNumForKey(long unsigned int trackId, long long int keyNum){
if (!indexPages.count(trackId)){
char id[100];
sprintf(id, "%s%lu", streamName.c_str(), trackId);
indexPages[trackId].init(id, 8 * 1024);
if (!metaPages.count(trackId)){
char id[NAME_BUFFER_SIZE];
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), trackId);
metaPages[trackId].init(id, 8 * 1024);
}
char * mpd = indexPages[trackId].mapped;
int len = indexPages[trackId].len / 8;
int len = metaPages[trackId].len / 8;
for (int i = 0; i < len; i++){
long amountKey = ntohl((((long long int*)mpd)[i]) & 0xFFFFFFFF);
int * tmpOffset = (int *)(metaPages[trackId].mapped + (i * 8));
long amountKey = ntohl(tmpOffset[1]);
if (amountKey == 0){continue;}
long tmpKey = ntohl(((((long long int*)mpd)[i]) >> 32) & 0xFFFFFFFF);
long tmpKey = ntohl(tmpOffset[0]);
if (tmpKey <= keyNum && ((tmpKey?tmpKey:1) + amountKey) > keyNum){
return tmpKey;
}
@ -405,20 +271,20 @@ namespace Mist {
void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){
if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){
curPages.erase(trackId);
curPage.erase(trackId);
currKeyOpen.erase(trackId);
return;
}
DEBUG_MSG(DLVL_HIGH, "Loading track %lu, containing key %lld", trackId, keyNum);
unsigned int timeout = 0;
int pageNum = pageNumForKey(trackId, keyNum);
unsigned long pageNum = pageNumForKey(trackId, keyNum);
while (pageNum == -1){
if (!timeout){
DEBUG_MSG(DLVL_DEVEL, "Requesting/waiting for page that has key %lu:%lld...", trackId, keyNum);
}
if (timeout++ > 100){
DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page. Aborting.");
curPages.erase(trackId);
curPage.erase(trackId);
currKeyOpen.erase(trackId);
return;
}
@ -443,11 +309,11 @@ namespace Mist {
if (currKeyOpen.count(trackId) && currKeyOpen[trackId] == (unsigned int)pageNum){
return;
}
char id[100];
snprintf(id, 100, "%s%lu_%d", streamName.c_str(), trackId, pageNum);
curPages[trackId].init(id, DEFAULT_DATA_PAGE_SIZE);
if (!(curPages[trackId].mapped)){
DEBUG_MSG(DLVL_FAIL, "Initializing page %s failed", curPages[trackId].name.c_str());
char id[NAME_BUFFER_SIZE];
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackId, pageNum);
curPage[trackId].init(id, DEFAULT_DATA_PAGE_SIZE);
if (!(curPage[trackId].mapped)){
DEBUG_MSG(DLVL_FAIL, "Initializing page %s failed", curPage[trackId].name.c_str());
return;
}
currKeyOpen[trackId] = pageNum;
@ -461,8 +327,10 @@ namespace Mist {
initialize();
}
buffer.clear();
currentPacket.null();
updateMeta();
thisPacket.null();
if (myMeta.live){
updateMeta();
}
DEBUG_MSG(DLVL_MEDIUM, "Seeking to %llums", pos);
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
seek(*it, pos);
@ -471,7 +339,7 @@ namespace Mist {
bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){
loadPageForKey(tid, getKeyForTime(tid, pos) + (getNextKey?1:0));
if (!curPages.count(tid) || !curPages[tid].mapped){
if (!curPage.count(tid) || !curPage[tid].mapped){
DEBUG_MSG(DLVL_DEVEL, "Aborting seek to %llums in track %u, not available.", pos, tid);
return false;
}
@ -479,9 +347,9 @@ namespace Mist {
tmp.tid = tid;
tmp.offset = 0;
DTSC::Packet tmpPack;
tmpPack.reInit(curPages[tid].mapped + tmp.offset, 0, true);
tmpPack.reInit(curPage[tid].mapped + tmp.offset, 0, true);
tmp.time = tmpPack.getTime();
char * mpd = curPages[tid].mapped;
char * mpd = curPage[tid].mapped;
while ((long long)tmp.time < pos && tmpPack){
tmp.offset += tmpPack.getDataLen();
tmpPack.reInit(mpd + tmp.offset, 0, true);
@ -492,15 +360,15 @@ namespace Mist {
return true;
}else{
//don't print anything for empty packets - not sign of corruption, just unfinished stream.
if (curPages[tid].mapped[tmp.offset] != 0){
if (curPage[tid].mapped[tmp.offset] != 0){
DEBUG_MSG(DLVL_FAIL, "Noes! Couldn't find packet on track %d because of some kind of corruption error or somesuch.", tid);
}else{
DEBUG_MSG(DLVL_FAIL, "Track %d no data (key %u @ %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset);
unsigned int i = 0;
while (curPages[tid].mapped[tmp.offset] == 0 && ++i < 42){
while (curPage[tid].mapped[tmp.offset] == 0 && ++i < 42){
Util::wait(100);
}
if (curPages[tid].mapped[tmp.offset] == 0){
if (curPage[tid].mapped[tmp.offset] == 0){
DEBUG_MSG(DLVL_FAIL, "Track %d no data (key %u) - timeout", tid, getKeyForTime(tid, pos) + (getNextKey?1:0));
}else{
return seek(tid, pos, getNextKey);
@ -539,7 +407,7 @@ namespace Mist {
sendHeader();
}
prepareNext();
if (currentPacket){
if (thisPacket){
sendNext();
}else{
if (!onFinish()){
@ -550,7 +418,7 @@ namespace Mist {
}
DEBUG_MSG(DLVL_MEDIUM, "MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data");
stats();
playerConn.finish();
userClient.finish();
statsPage.finish();
myConn.close();
return 0;
@ -575,7 +443,22 @@ namespace Mist {
if (!sought){
if (myMeta.live){
long unsigned int mainTrack = getMainSelectedTrack();
unsigned long long seekPos = myMeta.tracks[mainTrack].keys.begin()->getTime();
if (myMeta.tracks[mainTrack].keys.size() < 2){
if (!myMeta.tracks[mainTrack].keys.size()){
myConn.close();
return;
}else{
seek(myMeta.tracks[mainTrack].keys.begin()->getTime());
prepareNext();
return;
}
}
unsigned int skip = ((myMeta.tracks[mainTrack].keys.size()-1) * config->getInteger("startpos")) / 1000u;
std::deque<DTSC::Key>::iterator it = myMeta.tracks[mainTrack].keys.begin();
for (unsigned int i = 0; i < skip; ++i){
++it;
}
unsigned long long seekPos = it->getTime();
if (seekPos < 5000){
seekPos = 0;
}
@ -586,7 +469,7 @@ namespace Mist {
}
static unsigned int emptyCount = 0;
if (!buffer.size()){
currentPacket.null();
thisPacket.null();
DEBUG_MSG(DLVL_DEVEL, "Buffer completely played out");
onFinish();
return;
@ -596,15 +479,15 @@ namespace Mist {
DEBUG_MSG(DLVL_DONTEVEN, "Loading track %u (next=%lu), %llu ms", nxt.tid, nxtKeyNum[nxt.tid], nxt.time);
if (nxt.offset >= curPages[nxt.tid].len){
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, currentPacket.getTime());
if (nxt.offset >= curPage[nxt.tid].len){
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
nxt.offset = 0;
if (curPages.count(nxt.tid) && curPages[nxt.tid].mapped){
if (getDTSCTime(curPages[nxt.tid].mapped, nxt.offset) < nxt.time){
if (curPage.count(nxt.tid) && curPage[nxt.tid].mapped){
if (getDTSCTime(curPage[nxt.tid].mapped, nxt.offset) < nxt.time){
ERROR_MSG("Time going backwards in track %u - dropping track.", nxt.tid);
}else{
nxt.time = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset);
nxt.time = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset);
buffer.insert(nxt);
}
prepareNext();
@ -612,7 +495,7 @@ namespace Mist {
}
}
if (!curPages.count(nxt.tid) || !curPages[nxt.tid].mapped){
if (!curPage.count(nxt.tid) || !curPage[nxt.tid].mapped){
//mapping failure? Drop this track and go to next.
//not an error - usually means end of stream.
DEBUG_MSG(DLVL_DEVEL, "Track %u no page - dropping track.", nxt.tid);
@ -621,7 +504,7 @@ namespace Mist {
}
//have we arrived at the end of the memory page? (4 zeroes mark the end)
if (!memcmp(curPages[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4)){
if (!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4)){
//if we don't currently know where we are, we're lost. We should drop the track.
if (!nxt.time){
DEBUG_MSG(DLVL_DEVEL, "Timeless empty packet on track %u - dropping track.", nxt.tid);
@ -645,11 +528,11 @@ namespace Mist {
updateMeta();
}else{
//if we're not live, we've simply reached the end of the page. Load the next key.
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, currentPacket.getTime());
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
nxt.offset = 0;
if (curPages.count(nxt.tid) && curPages[nxt.tid].mapped){
unsigned long long nextTime = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset);
if (curPage.count(nxt.tid) && curPage[nxt.tid].mapped){
unsigned long long nextTime = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset);
if (nextTime && nextTime < nxt.time){
DEBUG_MSG(DLVL_DEVEL, "Time going backwards in track %u - dropping track.", nxt.tid);
}else{
@ -666,29 +549,29 @@ namespace Mist {
prepareNext();
return;
}
currentPacket.reInit(curPages[nxt.tid].mapped + nxt.offset, 0, true);
if (currentPacket){
if (currentPacket.getTime() != nxt.time && nxt.time){
DEBUG_MSG(DLVL_MEDIUM, "ACTUALLY Loaded track %ld (next=%lu), %llu ms", currentPacket.getTrackId(), nxtKeyNum[nxt.tid], currentPacket.getTime());
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
if (thisPacket){
if (thisPacket.getTime() != nxt.time && nxt.time){
DEBUG_MSG(DLVL_MEDIUM, "ACTUALLY Loaded track %ld (next=%lu), %llu ms", thisPacket.getTrackId(), nxtKeyNum[nxt.tid], thisPacket.getTime());
}
if ((myMeta.tracks[nxt.tid].type == "video" && currentPacket.getFlag("keyframe")) || (++nonVideoCount % 30 == 0)){
if ((myMeta.tracks[nxt.tid].type == "video" && thisPacket.getFlag("keyframe")) || (++nonVideoCount % 30 == 0)){
if (myMeta.live){
updateMeta();
}
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, currentPacket.getTime());
DEBUG_MSG(DLVL_VERYHIGH, "Track %u @ %llums = key %lu", nxt.tid, currentPacket.getTime(), nxtKeyNum[nxt.tid]);
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
DEBUG_MSG(DLVL_VERYHIGH, "Track %u @ %llums = key %lu", nxt.tid, thisPacket.getTime(), nxtKeyNum[nxt.tid]);
}
emptyCount = 0;
}
nxt.offset += currentPacket.getDataLen();
nxt.offset += thisPacket.getDataLen();
if (realTime){
while (nxt.time > (Util::getMS() - firstTime + maxSkipAhead)*1000/realTime) {
Util::sleep(nxt.time - (Util::getMS() - firstTime + minSkipAhead)*1000/realTime);
}
}
if (curPages[nxt.tid]){
if (nxt.offset < curPages[nxt.tid].len){
unsigned long long nextTime = getDTSCTime(curPages[nxt.tid].mapped, nxt.offset);
if (curPage[nxt.tid]){
if (nxt.offset < curPage[nxt.tid].len){
unsigned long long nextTime = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset);
if (nextTime){
nxt.time = nextTime;
}else{
@ -721,8 +604,8 @@ namespace Mist {
tmpEx.up(myConn.dataUp());
tmpEx.down(myConn.dataDown());
tmpEx.time(now - myConn.connTime());
if (currentPacket){
tmpEx.lastSecond(currentPacket.getTime());
if (thisPacket){
tmpEx.lastSecond(thisPacket.getTime());
}else{
tmpEx.lastSecond(0);
}
@ -730,18 +613,20 @@ namespace Mist {
}
}
int tNum = 0;
if (!playerConn.getData()){
playerConn = IPC::sharedClient(streamName + "_users", PLAY_EX_SIZE, true);
if (!playerConn.getData()){
if (!userClient.getData()){
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
if (!userClient.getData()){
DEBUG_MSG(DLVL_WARN, "Player connection failure - aborting output");
myConn.close();
return;
}
}
if (trackMap.size()){
for (std::map<int, int>::iterator it = trackMap.begin(); it != trackMap.end() && tNum < 5; it++){
for (std::map<unsigned long, unsigned long>::iterator it = trackMap.begin(); it != trackMap.end() && tNum < 5; it++){
unsigned int tId = it->second;
char * thisData = playerConn.getData() + (6 * tNum);
char * thisData = userClient.getData() + (6 * tNum);
thisData[0] = ((tId >> 24) & 0xFF);
thisData[1] = ((tId >> 16) & 0xFF);
thisData[2] = ((tId >> 8) & 0xFF);
@ -753,7 +638,7 @@ namespace Mist {
}else{
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < 5; it++){
unsigned int tId = *it;
char * thisData = playerConn.getData() + (6 * tNum);
char * thisData = userClient.getData() + (6 * tNum);
thisData[0] = ((tId >> 24) & 0xFF);
thisData[1] = ((tId >> 16) & 0xFF);
thisData[2] = ((tId >> 8) & 0xFF);
@ -763,7 +648,7 @@ namespace Mist {
tNum ++;
}
}
playerConn.keepAlive();
userClient.keepAlive();
if (tNum >= 5){
DEBUG_MSG(DLVL_WARN, "Too many tracks selected, using only first 5");
}
@ -779,5 +664,4 @@ namespace Mist {
//just set the sentHeader bool to true, by default
sentHeader = true;
}
}

View file

@ -9,6 +9,7 @@
#include <mist/dtsc.h>
#include <mist/socket.h>
#include <mist/shared_memory.h>
#include "../io.h"
namespace Mist {
@ -26,24 +27,13 @@ namespace Mist {
unsigned int offset;
};
struct DTSCPageData {
DTSCPageData() : pageNum(0), keyNum(0), partNum(0), dataSize(0), curOffset(0), firstTime(0), lastKeyTime(-5000){}
int pageNum;///<The current page number
int keyNum;///<The number of keyframes in this page.
int partNum;///<The number of parts in this page.
unsigned long long int dataSize;///<The full size this page should be.
unsigned long long int curOffset;///<The current write offset in the page.
unsigned long long int firstTime;///<The first timestamp of the page.
long long int lastKeyTime;///<The time of the last keyframe of the page.
};
/// The output class is intended to be inherited by MistOut process classes.
/// The output class is intended to be inherited by MistOut process classes.
/// It contains all generic code and logic, while the child classes implement
/// anything specific to particular protocols or containers.
/// It contains several virtual functions, that may be overridden to "hook" into
/// the streaming process at those particular points, simplifying child class
/// logic and implementation details.
class Output {
class Output : public InOutBase {
public:
//constructor and destructor
Output(Socket::Connection & conn);
@ -66,7 +56,9 @@ namespace Mist {
virtual void sendNext() {}//REQUIRED! Others are optional.
virtual void prepareNext();
virtual void onRequest();
virtual bool onFinish(){return false;}
virtual bool onFinish() {
return false;
}
virtual void initialize();
virtual void sendHeader();
virtual void onFail();
@ -85,14 +77,6 @@ namespace Mist {
bool isBlocking;///< If true, indicates that myConn is blocking.
unsigned int crc;///< Checksum, if any, for usage in the stats.
unsigned int getKeyForTime(long unsigned int trackId, long long timeStamp);
IPC::sharedPage streamIndex;///< Shared memory used for metadata
std::map<int,IPC::sharedPage> indexPages;///< Maintains index pages of each track, holding information about available pages with DTSC packets.
std::map<int,IPC::sharedPage> curPages;///< Holds the currently used pages with DTSC packets for each track.
/// \todo Privitize keyTimes
IPC::sharedClient playerConn;///< Shared memory used for connection to MistIn process.
std::map<int,std::set<int> > keyTimes;///< Per-track list of keyframe times, for keyframe detection.
//static member for initialization
static Util::Config * config;///< Static, global configuration for the MistOut process
//stream delaying variables
unsigned int maxSkipAhead;///< Maximum ms that we will go ahead of the intended timestamps.
@ -101,26 +85,14 @@ namespace Mist {
//Read/write status variables
Socket::Connection & myConn;///< Connection to the client.
std::string streamName;///< Name of the stream that will be opened by initialize()
std::set<unsigned long> selectedTracks; ///< Tracks that are selected for playback
bool wantRequest;///< If true, waits for a request.
bool parseData;///< If true, triggers initalization if not already done, sending of header, sending of packets.
bool isInitialized;///< If false, triggers initialization if parseData is true.
bool sentHeader;///< If false, triggers sendHeader if parseData is true.
//Read-only stream data variables
DTSC::Packet currentPacket;///< The packet that is ready for sending now.
DTSC::Meta myMeta;///< Up to date stream metadata
//For pushing data through an output into the buffer process
void negotiateWithBuffer(int tid);
void negotiatePushTracks();
void bufferPacket(JSON::Value & pack);
DTSC::Meta meta_out;
std::deque<JSON::Value> preBuf;
std::map<int,int> trackMap;
std::map<int,DTSCPageData> bookKeeping;
};

View file

@ -163,14 +163,14 @@ namespace Mist {
}
void OutHDS::sendNext(){
if (currentPacket.getTime() >= playUntil){
if (thisPacket.getTime() >= playUntil){
DEBUG_MSG(DLVL_HIGH, "(%d) Done sending fragment", getpid() );
stop();
wantRequest = true;
H.Chunkify("", 0, myConn);
return;
}
tag.DTSCLoader(currentPacket, myMeta.tracks[currentPacket.getTrackId()]);
tag.DTSCLoader(thisPacket, myMeta.tracks[thisPacket.getTrackId()]);
H.Chunkify(tag.data, tag.len, myConn);
}

View file

@ -65,7 +65,7 @@ namespace Mist {
}
void OutHSS::sendNext() {
if (currentPacket.getTime() >= playUntil) {
if (thisPacket.getTime() >= playUntil) {
stop();
wantRequest = true;
H.Chunkify("", 0, myConn);
@ -74,7 +74,7 @@ namespace Mist {
}
char * dataPointer = 0;
unsigned int len = 0;
currentPacket.getString("data", dataPointer, len);
thisPacket.getString("data", dataPointer, len);
H.Chunkify(dataPointer, len, myConn);
}
@ -149,7 +149,13 @@ namespace Mist {
}
}
seek(seekTime);
playUntil = (*(keyTimes[tid].upper_bound(seekTime)));
///\todo Rewrite to fragments
for (std::deque<DTSC::Key>::iterator it2 = myMeta.tracks[tid].keys.begin(); it2 != myMeta.tracks[tid].keys.end(); it2++) {
if (it2->getTime() > seekTime){
playUntil = it2->getTime();
break;
}
}
myTrackStor = tid;
myKeyStor = seekTime;
keysToSend = 1;
@ -450,11 +456,6 @@ namespace Mist {
void OutHSS::initialize() {
Output::initialize();
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
for (std::deque<DTSC::Key>::iterator it2 = it->second.keys.begin(); it2 != it->second.keys.end(); it2++) {
keyTimes[it->first].insert(it2->getTime());
}
}
}
}

View file

@ -9,7 +9,7 @@ namespace Mist {
static void init(Util::Config * cfg);
void onHTTP();
void sendNext();
void initialize();
void initialize();/*LTS*/
void sendHeader();
protected:
JSON::Value encryption;

View file

@ -167,7 +167,7 @@ namespace Mist {
if (handler != capa["name"].asStringRef() || H.GetVar("stream") != streamName){
DEBUG_MSG(DLVL_MEDIUM, "Switching from %s (%s) to %s (%s)", capa["name"].asStringRef().c_str(), streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str());
streamName = H.GetVar("stream");
playerConn.finish();
userClient.finish();
statsPage.finish();
reConnector(handler);
H.Clean();

View file

@ -26,7 +26,7 @@ namespace Mist {
}
first = false;
}
myConn.SendNow(currentPacket.toJSON().toString());
myConn.SendNow(thisPacket.toJSON().toString());
}
void OutJSON::sendHeader(){

View file

@ -31,7 +31,7 @@ namespace Mist {
}
void OutProgressiveFLV::sendNext(){
tag.DTSCLoader(currentPacket, myMeta.tracks[currentPacket.getTrackId()]);
tag.DTSCLoader(thisPacket, myMeta.tracks[thisPacket.getTrackId()]);
myConn.SendNow(tag.data, tag.len);
}

View file

@ -19,7 +19,7 @@ namespace Mist {
void OutProgressiveMP3::sendNext(){
char * dataPointer = 0;
unsigned int len = 0;
currentPacket.getString("data", dataPointer, len);
thisPacket.getString("data", dataPointer, len);
myConn.SendNow(dataPointer, len);
}

View file

@ -460,16 +460,16 @@ namespace Mist {
static bool perfect = true;
char * dataPointer = 0;
unsigned int len = 0;
currentPacket.getString("data", dataPointer, len);
if ((unsigned long)currentPacket.getTrackId() != sortSet.begin()->trackID || currentPacket.getTime() != sortSet.begin()->time){
if (currentPacket.getTime() >= sortSet.begin()->time || (unsigned long)currentPacket.getTrackId() >= sortSet.begin()->trackID){
thisPacket.getString("data", dataPointer, len);
if ((unsigned long)thisPacket.getTrackId() != sortSet.begin()->trackID || thisPacket.getTime() != sortSet.begin()->time){
if (thisPacket.getTime() >= sortSet.begin()->time || (unsigned long)thisPacket.getTrackId() >= sortSet.begin()->trackID){
if (perfect){
DEBUG_MSG(DLVL_WARN, "Warning: input is inconsistent. Expected %lu:%llu but got %ld:%llu - cancelling playback", sortSet.begin()->trackID, sortSet.begin()->time, currentPacket.getTrackId(), currentPacket.getTime());
DEBUG_MSG(DLVL_WARN, "Warning: input is inconsistent. Expected %lu:%llu but got %ld:%llu - cancelling playback", sortSet.begin()->trackID, sortSet.begin()->time, thisPacket.getTrackId(), thisPacket.getTime());
perfect = false;
myConn.close();
}
}else{
DEBUG_MSG(DLVL_HIGH, "Did not receive expected %lu:%llu but got %ld:%llu - throwing it away", sortSet.begin()->trackID, sortSet.begin()->time, currentPacket.getTrackId(), currentPacket.getTime());
DEBUG_MSG(DLVL_HIGH, "Did not receive expected %lu:%llu but got %ld:%llu - throwing it away", sortSet.begin()->trackID, sortSet.begin()->time, thisPacket.getTrackId(), thisPacket.getTime());
}
return;
}

View file

@ -27,43 +27,31 @@ namespace Mist {
}
void OutProgressiveOGG::sendNext(){
unsigned int track = currentPacket.getTrackId();
unsigned int track = thisPacket.getTrackId();
OGG::oggSegment newSegment;
currentPacket.getString("data", newSegment.dataString);
// if (currentPacket.getTime() > 315800){// && currentPacket.getTime() < 316200){
//INFO_MSG("Found a packet of time %llu, size: %d", currentPacket.getTime(), newSegment.dataString.size());
//}
pageBuffer[track].totalFrames = ((double)currentPacket.getTime() / (1000000.0f / myMeta.tracks[track].fpks)) + 1.5; //should start at 1. added .5 for rounding.
// INFO_MSG("track: %u totalFrames %llu timestamp: %llu totalframe value: %f", track, pageBuffer[track].totalFrames, currentPacket.getTime(), ((double)currentPacket.getTime() / (1000000.0f / myMeta.tracks[track].fpks)) + 1);
thisPacket.getString("data", newSegment.dataString);
pageBuffer[track].totalFrames = ((double)thisPacket.getTime() / (1000000.0f / myMeta.tracks[track].fpks)) + 1.5; //should start at 1. added .5 for rounding.
if (pageBuffer[track].codec == OGG::THEORA){
newSegment.isKeyframe = currentPacket.getFlag("keyframe");
newSegment.isKeyframe = thisPacket.getFlag("keyframe");
if (newSegment.isKeyframe == true){
pageBuffer[track].sendTo(myConn);//send data remaining in buffer (expected to fit on a page), keyframe will allways start on new page
// INFO_MSG("segments left in buffer: %d", pageBuffer[track].oggSegments.size());
pageBuffer[track].lastKeyFrame = pageBuffer[track].totalFrames;
}
newSegment.framesSinceKeyFrame = pageBuffer[track].totalFrames - pageBuffer[track].lastKeyFrame;
newSegment.lastKeyFrameSeen = pageBuffer[track].lastKeyFrame;
// theora::frame tmpFrame;
// tmpFrame.read(newSegment.dataString.data(),newSegment.dataString.size());
// INFO_MSG("FTYPE: %d ISKEYFRAME: %d",tmpFrame.getFTYPE(),newSegment.isKeyframe );
}
newSegment.frameNumber = pageBuffer[track].totalFrames;
newSegment.timeStamp = currentPacket.getTime();
newSegment.timeStamp = thisPacket.getTime();
pageBuffer[track].oggSegments.push_back(newSegment);
if (pageBuffer[track].codec == OGG::VORBIS){
pageBuffer[track].vorbisStuff();//this updates lastKeyFrame
}
// while (pageBuffer[track].oggSegments.size()){
//pageBuffer[track].sendTo(myConn);
//}
while (pageBuffer[track].shouldSend()){
pageBuffer[track].sendTo(myConn);
}

View file

@ -59,7 +59,7 @@ namespace Mist {
}
void OutRaw::sendNext(){
myConn.SendNow(currentPacket.getData(), currentPacket.getDataLen());
myConn.SendNow(thisPacket.getData(), thisPacket.getDataLen());
}
void OutRaw::sendHeader(){

View file

@ -89,7 +89,7 @@ namespace Mist {
pos = nextpos + 1;
}
if (trackSwitch){
seek(currentPacket.getTime());
seek(thisPacket.getTime());
}
}
@ -133,8 +133,8 @@ namespace Mist {
unsigned int dheader_len = 1;
char * tmpData = 0;//pointer to raw media data
unsigned int data_len = 0;//length of processed media data
currentPacket.getString("data", tmpData, data_len);
DTSC::Track & track = myMeta.tracks[currentPacket.getTrackId()];
thisPacket.getString("data", tmpData, data_len);
DTSC::Track & track = myMeta.tracks[thisPacket.getTrackId()];
//set msg_type_id
if (track.type == "video"){
@ -143,8 +143,8 @@ namespace Mist {
dheader_len += 4;
dataheader[0] = 7;
dataheader[1] = 1;
if (currentPacket.getInt("offset") > 0){
long long offset = currentPacket.getInt("offset");
if (thisPacket.getInt("offset") > 0){
long long offset = thisPacket.getInt("offset");
dataheader[2] = (offset >> 16) & 0xFF;
dataheader[3] = (offset >> 8) & 0xFF;
dataheader[4] = offset & 0xFF;
@ -153,12 +153,12 @@ namespace Mist {
if (track.codec == "H263"){
dataheader[0] = 2;
}
if (currentPacket.getFlag("keyframe")){
if (thisPacket.getFlag("keyframe")){
dataheader[0] |= 0x10;
}else{
dataheader[0] |= 0x20;
}
if (currentPacket.getFlag("disposableframe")){
if (thisPacket.getFlag("disposableframe")){
dataheader[0] |= 0x30;
}
}
@ -189,7 +189,7 @@ namespace Mist {
}
data_len += dheader_len;
unsigned int timestamp = currentPacket.getTime();
unsigned int timestamp = thisPacket.getTime();
bool allow_short = RTMPStream::lastsend.count(4);
RTMPStream::Chunk & prev = RTMPStream::lastsend[4];
@ -808,22 +808,43 @@ namespace Mist {
F.ChunkLoader(next);
JSON::Value pack_out = F.toJSON(meta_out);
if ( !pack_out.isNull()){
//Check for backwards timestamps
if (pack_out["time"].asInt() < meta_out.tracks[pack_out["trackid"].asInt()].lastms){
///Reset all internals
sending = false;
counter = 0;
preBuf.clear();
meta_out = DTSC::Meta();
pack_out = F.toJSON(meta_out);//Reinitialize the metadata with this packet.
///Reset negotiation with buffer
userClient.finish();
userClient = IPC::sharedClient(streamName + "_users", PLAY_EX_SIZE, true);
}
if ( !sending){
counter++;
if (counter > 8){
sending = true;
myMeta = meta_out;
negotiatePushTracks();
if (!userClient.getData()){
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
userClient = IPC::sharedClient(userPageName, 30, true);
}
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
DEBUG_MSG(DLVL_MEDIUM, "Starting negotiation for track %d", it->first);
continueNegotiate(it->first);
}
//negotiatePushTracks();
for (std::deque<JSON::Value>::iterator it = preBuf.begin(); it != preBuf.end(); it++){
bufferPacket((*it));
bufferLivePacket((*it));
}
preBuf.clear(); //clear buffer
bufferPacket(pack_out);
bufferLivePacket(pack_out);
}else{
preBuf.push_back(pack_out);
}
}else{
bufferPacket(pack_out);
bufferLivePacket(pack_out);
}
}
break;

View file

@ -25,17 +25,17 @@ namespace Mist {
void OutProgressiveSRT::sendNext(){
char * dataPointer = 0;
unsigned int len = 0;
currentPacket.getString("data", dataPointer, len);
thisPacket.getString("data", dataPointer, len);
std::stringstream tmp;
if(!webVTT) {
tmp << lastNum++ << std::endl;
}
long long unsigned int time = currentPacket.getTime();
long long unsigned int time = thisPacket.getTime();
char tmpBuf[50];
int tmpLen = sprintf(tmpBuf, "%.2llu:%.2llu:%.2llu,%.3llu", (time / 3600000), ((time % 3600000) / 60000), (((time % 3600000) % 60000) / 1000), time % 1000);
tmp.write(tmpBuf, tmpLen);
tmp << " --> ";
time += currentPacket.getInt("duration");
time += thisPacket.getInt("duration");
tmpLen = sprintf(tmpBuf, "%.2llu:%.2llu:%.2llu,%.3llu", (time / 3600000), ((time % 3600000) / 60000), (((time % 3600000) % 60000) / 1000), time % 1000);
tmp.write(tmpBuf, tmpLen);
tmp << std::endl;

View file

@ -31,18 +31,18 @@ namespace Mist {
if (packData.getBytesFree() == 184){
packData.clear();
packData.setPID(0x100 - 1 + currentPacket.getTrackId());
packData.setPID(0x100 - 1 + thisPacket.getTrackId());
packData.setContinuityCounter(++contCounters[packData.getPID()]);
if (first[currentPacket.getTrackId()]){
if (first[thisPacket.getTrackId()]){
packData.setUnitStart(1);
packData.setDiscontinuity(true);
if (myMeta.tracks[currentPacket.getTrackId()].type == "video"){
if (currentPacket.getInt("keyframe")){
if (myMeta.tracks[thisPacket.getTrackId()].type == "video"){
if (thisPacket.getInt("keyframe")){
packData.setRandomAccess(1);
}
packData.setPCR(currentPacket.getTime() * 27000);
packData.setPCR(thisPacket.getTime() * 27000);
}
first[currentPacket.getTrackId()] = false;
first[thisPacket.getTrackId()] = false;
}
}
@ -53,11 +53,11 @@ namespace Mist {
}
void TSOutput::sendNext(){
first[currentPacket.getTrackId()] = true;
first[thisPacket.getTrackId()] = true;
char * dataPointer = 0;
unsigned int dataLen = 0;
currentPacket.getString("data", dataPointer, dataLen); //data
if (currentPacket.getTime() >= until){ //this if should only trigger for HLS
thisPacket.getString("data", dataPointer, dataLen); //data
if (thisPacket.getTime() >= until){ //this if should only trigger for HLS
stop();
wantRequest = true;
parseData = false;
@ -66,16 +66,16 @@ namespace Mist {
}
std::string bs;
//prepare bufferstring
if (myMeta.tracks[currentPacket.getTrackId()].type == "video"){
if (myMeta.tracks[thisPacket.getTrackId()].type == "video"){
unsigned int extraSize = 0;
//dataPointer[4] & 0x1f is used to check if this should be done later: fillPacket("\000\000\000\001\011\360", 6);
if (myMeta.tracks[currentPacket.getTrackId()].codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){
if (myMeta.tracks[thisPacket.getTrackId()].codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){
extraSize += 6;
}
if (currentPacket.getInt("keyframe")){
if (myMeta.tracks[currentPacket.getTrackId()].codec == "H264"){
if (thisPacket.getInt("keyframe")){
if (myMeta.tracks[thisPacket.getTrackId()].codec == "H264"){
if (!haveAvcc){
avccbox.setPayload(myMeta.tracks[currentPacket.getTrackId()].init);
avccbox.setPayload(myMeta.tracks[thisPacket.getTrackId()].init);
haveAvcc = true;
}
bs = avccbox.asAnnexB();
@ -91,16 +91,16 @@ namespace Mist {
while (currPack <= splitCount){
unsigned int alreadySent = 0;
bs = TS::Packet::getPESVideoLeadIn((currPack != splitCount ? watKunnenWeIn1Ding : dataLen+extraSize - currPack*watKunnenWeIn1Ding), currentPacket.getTime() * 90, currentPacket.getInt("offset") * 90, !currPack);
bs = TS::Packet::getPESVideoLeadIn((currPack != splitCount ? watKunnenWeIn1Ding : dataLen+extraSize - currPack*watKunnenWeIn1Ding), thisPacket.getTime() * 90, thisPacket.getInt("offset") * 90, !currPack);
fillPacket(bs.data(), bs.size());
if (!currPack){
if (myMeta.tracks[currentPacket.getTrackId()].codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){
if (myMeta.tracks[thisPacket.getTrackId()].codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){
//End of previous nal unit, if not already present
fillPacket("\000\000\000\001\011\360", 6);
alreadySent += 6;
}
if (currentPacket.getInt("keyframe")){
if (myMeta.tracks[currentPacket.getTrackId()].codec == "H264"){
if (thisPacket.getInt("keyframe")){
if (myMeta.tracks[thisPacket.getTrackId()].codec == "H264"){
bs = avccbox.asAnnexB();
fillPacket(bs.data(), bs.size());
alreadySent += bs.size();
@ -137,32 +137,32 @@ namespace Mist {
if (alreadySent == watKunnenWeIn1Ding){
packData.addStuffing();
fillPacket(0, 0);
first[currentPacket.getTrackId()] = true;
first[thisPacket.getTrackId()] = true;
break;
}
}
currPack++;
}
}else if (myMeta.tracks[currentPacket.getTrackId()].type == "audio"){
}else if (myMeta.tracks[thisPacket.getTrackId()].type == "audio"){
long unsigned int tempLen = dataLen;
if ( myMeta.tracks[currentPacket.getTrackId()].codec == "AAC"){
if ( myMeta.tracks[thisPacket.getTrackId()].codec == "AAC"){
tempLen += 7;
}
long long unsigned int tempTime;
if (appleCompat){
tempTime = 0;// myMeta.tracks[currentPacket.getTrackId()].rate / 1000;
tempTime = 0;// myMeta.tracks[thisPacket.getTrackId()].rate / 1000;
}else{
tempTime = currentPacket.getTime() * 90;
tempTime = thisPacket.getTime() * 90;
}
///\todo stuur 70ms aan audio per PES pakket om applecompat overbodig te maken.
//static unsigned long long lastSent=currentPacket.getTime() * 90;
//if( (currentPacket.getTime() * 90)-lastSent >= 70*90 ){
// lastSent=(currentPacket.getTime() * 90);
//static unsigned long long lastSent=thisPacket.getTime() * 90;
//if( (thisPacket.getTime() * 90)-lastSent >= 70*90 ){
// lastSent=(thisPacket.getTime() * 90);
//}
bs = TS::Packet::getPESAudioLeadIn(tempLen, tempTime);// myMeta.tracks[currentPacket.getTrackId()].rate / 1000 );
bs = TS::Packet::getPESAudioLeadIn(tempLen, tempTime);// myMeta.tracks[thisPacket.getTrackId()].rate / 1000 );
fillPacket(bs.data(), bs.size());
if (myMeta.tracks[currentPacket.getTrackId()].codec == "AAC"){
bs = TS::getAudioHeader(dataLen, myMeta.tracks[currentPacket.getTrackId()].init);
if (myMeta.tracks[thisPacket.getTrackId()].codec == "AAC"){
bs = TS::getAudioHeader(dataLen, myMeta.tracks[thisPacket.getTrackId()].init);
fillPacket(bs.data(), bs.size());
}
fillPacket(dataPointer,dataLen);