Recording, HLS Push, UDP (Multicast) Input, Threaded TS

This commit is contained in:
Erik Zandvliet 2016-01-28 15:00:25 +01:00 committed by Thulinma
parent 1c3e143709
commit c25a533729
29 changed files with 1809 additions and 815 deletions

View file

@ -1,101 +0,0 @@
macro(makeAnalyser analyserName format)
add_executable( MistAnalyser${analyserName} analysers/${format}_analyser.cpp )
target_link_libraries( MistAnalyser${analyserName} mist )
endmacro()
macro(makeInput inputName format)
add_executable( MistIn${inputName} input/mist_in.cpp input/input.cpp input/input_${format}.cpp )
set_target_properties( MistIn${inputName} PROPERTIES COMPILE_DEFINITIONS INPUTTYPE=\"input_${format}.h\")
target_link_libraries( MistIn${inputName} mist )
endmacro()
macro(makeOutput outputName format)
#check if 'http' is one of the argyments, if yes, this is an http output
if (";${ARGN};" MATCHES ";http;")
SET(httpOutput output/output_http.cpp)
if (";${ARGN};" MATCHES ";ts;")
SET(tsBaseClass HTTPOutput)
else()
SET(tsBaseClass Output)
endif()
endif()
if (";${ARGN};" MATCHES ";ts;")
SET(tsOutput output/output_ts_base.cpp)
endif()
add_executable( MistOut${outputName} output/mist_out.cpp output/output.cpp ${httpOutput} ${tsOutput} output/output_${format}.cpp )
set_target_properties( MistOut${outputName} PROPERTIES COMPILE_DEFINITIONS "OUTPUTTYPE=\"output_${format}.h\";TS_BASECLASS=${tsBaseClass}")
target_link_libraries( MistOut${outputName} mist )
endmacro()
makeAnalyser(RTMP rtmp)
makeAnalyser(FLV flv)
makeAnalyser(DTSC dtsc)
makeAnalyser(AMF amf)
makeAnalyser(MP4 mp4)
makeAnalyser(OGG ogg)
makeInput(DTSC dtsc)
makeInput(MP3 mp3)
makeInput(FLV flv)
makeInput(OGG ogg)
makeInput(Buffer buffer)
makeOutput(RTMP rtmp)
makeOutput(OGG progressive_ogg http)
makeOutput(FLV progressive_flv http)
makeOutput(MP4 progressive_mp4 http)
makeOutput(MP3 progressive_mp3 http)
makeOutput(HSS hss http)
makeOutput(HDS hds http)
makeOutput(SRT srt http)
makeOutput(JSON json http)
makeOutput(TS ts ts)
makeOutput(HTTPTS httpts http ts)
makeOutput(HLS hls http ts)
#get the bitlength of this system
execute_process(COMMAND getconf LONG_BIT OUTPUT_VARIABLE RELEASE_RAW )
#strip off the trailing spaces and newline
string(STRIP ${RELEASE_RAW} RELEASE)
set(RELEASE \"${RELEASE}\" )
include_directories(${CMAKE_CURRENT_BINARY_DIR})
add_executable( sourcery sourcery.cpp )
add_custom_target( embedcode
ALL
./sourcery ${CMAKE_CURRENT_SOURCE_DIR}/embed.js embed_js ${CMAKE_CURRENT_BINARY_DIR}/embed.js.h
DEPENDS sourcery ${CMAKE_CURRENT_SOURCE_DIR}/embed.js
VERBATIM
)
add_custom_target( localSettingsPage
ALL
./sourcery ${BINARY_DIR}/lsp/server.html server_html ${CMAKE_CURRENT_BINARY_DIR}/server.html.h
DEPENDS sourcery lsp
VERBATIM
)
add_executable( MistOutHTTP output/mist_out.cpp output/output.cpp output/output_http.cpp output/output_http_internal.cpp)
set_target_properties( MistOutHTTP PROPERTIES COMPILE_DEFINITIONS "OUTPUTTYPE=\"output_http_internal.h\"")
add_dependencies(MistOutHTTP embedcode)
target_link_libraries( MistOutHTTP mist )
add_executable( MistController
controller/controller.cpp
controller/controller_api.h
controller/controller_api.cpp
controller/controller_capabilities.h
controller/controller_capabilities.cpp
controller/controller_connectors.h
controller/controller_connectors.cpp
controller/controller_statistics.h
controller/controller_statistics.cpp
controller/controller_storage.h
controller/controller_storage.cpp
controller/controller_streams.h
controller/controller_streams.cpp
)
set_target_properties( MistController PROPERTIES COMPILE_DEFINITIONS RELEASE=${RELEASE})
target_link_libraries( MistController mist )
add_dependencies(MistController localSettingsPage)

View file

@ -64,6 +64,7 @@ namespace Controller {
std::string udpPort = data["udpport"].asString();
//Check running
if (!inputProcesses.count(name) || !Util::Procs::isRunning(inputProcesses[name])){
std::string multicast = data["multicastinterface"].asString();
// False: start TS input
INFO_MSG("No TS Input running on port %s for stream %s, starting it", udpPort.c_str(), name.c_str());
std::deque<std::string> command;
@ -72,6 +73,8 @@ namespace Controller {
command.push_back(name);
command.push_back("-p");
command.push_back(udpPort);
command.push_back("-M");
command.push_back(multicast);
command.push_back(URL);
int stdIn = 0;
int stdOut = 1;

View file

@ -29,11 +29,7 @@ namespace Mist {
Input::Input(Util::Config * cfg) : InOutBase() {
config = cfg;
#ifdef INPUT_LIVE
standAlone = false;
#else
standAlone = true;
#endif
JSON::Value option;
option["long"] = "json";
@ -90,7 +86,7 @@ namespace Mist {
}
void Input::checkHeaderTimes(std::string streamFile) {
if (streamFile == "-") {
if (streamFile == "-" || streamFile == "push://") {
return;
}
std::string headerFile = streamFile + ".dtsh";
@ -124,6 +120,7 @@ namespace Mist {
config->getOption("streamname") = streamName;
}
streamName = config->getString("streamname");
nProxy.streamName = streamName;
if (config->getBool("json")) {
std::cout << capa.toString() << std::endl;
return 0;
@ -132,26 +129,19 @@ namespace Mist {
std::cerr << config->getString("cmd") << " setup failed." << std::endl;
return 0;
}
//Do not read the header if this is a live stream
#ifndef INPUT_LIVE
checkHeaderTimes(config->getString("input"));
if (!readHeader()) {
std::cerr << "Reading header for " << config->getString("input") << " failed." << std::endl;
return 0;
}
parseHeader();
#endif
//Live inputs only have a serve() mode
#ifndef INPUT_LIVE
if (!config->getString("streamname").size()) {
if (!streamName.size()) {
convert();
} else {
#endif
serve();
#ifndef INPUT_LIVE
}
#endif
return 0;
}
@ -203,44 +193,6 @@ namespace Mist {
void Input::serve(){
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
#ifdef INPUT_LIVE
unsigned int giveUpCounter = 0;
while (!Util::startInput(streamName) && config->is_active && ++giveUpCounter < 20) {
Util::sleep(500);
}
if (giveUpCounter >= 20){
FAIL_MSG("Could not start buffer for stream '%s', aborting stream input!", streamName.c_str());
config->is_active = false;
}
userClient = IPC::sharedClient(userPageName, 30, true);
getNext();
while (thisPacket || config->is_active) {
unsigned long tid = thisPacket.getTrackId();
//Check for eligibility of track
IPC::userConnection userConn(userClient.getData());
if (trackOffset.count(tid) && !userConn.getTrackId(trackOffset[tid])) {
trackOffset.erase(tid);
trackState.erase(tid);
trackMap.erase(tid);
trackBuffer.erase(tid);
pagesByTrack.erase(tid);
metaPages.erase(tid);
curPageNum.erase(tid);
curPage.erase(tid);
INFO_MSG("Erasing track %d", tid);
continue;
}
if (thisPacket) {
continueNegotiate(thisPacket.getTrackId());
bufferLivePacket(thisPacket);
} else {
Util::sleep(100);
}
getNext();
userClient.keepAlive();
}
userClient.finish();
#else
/*LTS-START*/
if(Triggers::shouldTrigger("STREAM_READY", config->getString("streamname"))){
std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n";
@ -283,7 +235,6 @@ namespace Mist {
Util::sleep(1000);
}
}
#endif
finish();
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str());
//end player functionality
@ -297,7 +248,7 @@ namespace Mist {
}
removeUnused();
if (standAlone) {
for (std::map<unsigned long, IPC::sharedPage>::iterator it = metaPages.begin(); it != metaPages.end(); it++) {
for (std::map<unsigned long, IPC::sharedPage>::iterator it = nProxy.metaPages.begin(); it != nProxy.metaPages.end(); it++) {
it->second.master = true;
}
}
@ -316,9 +267,9 @@ namespace Mist {
bufferRemove(it->first, it2->first);
pageCounter[it->first].erase(it2->first);
for (int i = 0; i < 8192; i += 8) {
unsigned int thisKeyNum = ntohl(((((long long int *)(metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
unsigned int thisKeyNum = ntohl(((((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
if (thisKeyNum == it2->first) {
(((long long int *)(metaPages[it->first].mapped + i))[0]) = 0;
(((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) = 0;
}
}
change = true;
@ -359,13 +310,13 @@ namespace Mist {
for (int i = 0; i < it->second.keys.size(); i++) {
if (newData) {
//i+1 because keys are 1-indexed
pagesByTrack[it->first][i + 1].firstTime = it->second.keys[i].getTime();
nProxy.pagesByTrack[it->first][i + 1].firstTime = it->second.keys[i].getTime();
newData = false;
}
pagesByTrack[it->first].rbegin()->second.keyNum++;
pagesByTrack[it->first].rbegin()->second.partNum += it->second.keys[i].getParts();
pagesByTrack[it->first].rbegin()->second.dataSize += it->second.keySizes[i];
if (pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE) {
nProxy.pagesByTrack[it->first].rbegin()->second.keyNum++;
nProxy.pagesByTrack[it->first].rbegin()->second.partNum += it->second.keys[i].getParts();
nProxy.pagesByTrack[it->first].rbegin()->second.dataSize += it->second.keySizes[i];
if (nProxy.pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE) {
newData = true;
}
}
@ -398,7 +349,7 @@ namespace Mist {
}
if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum) {
if (curData[tid].dataSize > FLIP_DATA_PAGE_SIZE) {
pagesByTrack[tid][bookKeeping[tid].first] = curData[tid];
nProxy.pagesByTrack[tid][bookKeeping[tid].first] = curData[tid];
bookKeeping[tid].first += curData[tid].keyNum;
curData[tid].keyNum = 0;
curData[tid].dataSize = 0;
@ -415,17 +366,17 @@ namespace Mist {
getNext(false);
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (curData.count(it->first) && !pagesByTrack[it->first].count(bookKeeping[it->first].first)) {
pagesByTrack[it->first][bookKeeping[it->first].first] = curData[it->first];
if (curData.count(it->first) && !nProxy.pagesByTrack[it->first].count(bookKeeping[it->first].first)) {
nProxy.pagesByTrack[it->first][bookKeeping[it->first].first] = curData[it->first];
}
}
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (!pagesByTrack.count(it->first)) {
if (!nProxy.pagesByTrack.count(it->first)) {
DEBUG_MSG(DLVL_WARN, "No pages for track %d found", it->first);
} else {
DEBUG_MSG(DLVL_MEDIUM, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), pagesByTrack[it->first].size());
for (std::map<unsigned long, DTSCPageData>::iterator it2 = pagesByTrack[it->first].begin(); it2 != pagesByTrack[it->first].end(); it2++) {
DEBUG_MSG(DLVL_MEDIUM, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), nProxy.pagesByTrack[it->first].size());
for (std::map<unsigned long, DTSCPageData>::iterator it2 = nProxy.pagesByTrack[it->first].begin(); it2 != nProxy.pagesByTrack[it->first].end(); it2++) {
DEBUG_MSG(DLVL_VERYHIGH, "Page %lu-%lu, (%llu bytes)", it2->first, it2->first + it2->second.keyNum - 1, it2->second.dataSize);
}
}
@ -443,10 +394,10 @@ namespace Mist {
if (keyNum < 1) {
keyNum = 1;
}
if (isBuffered(track, keyNum)) {
if (nProxy.isBuffered(track, keyNum)) {
//get corresponding page number
int pageNumber = 0;
for (std::map<unsigned long, DTSCPageData>::iterator it = pagesByTrack[track].begin(); it != pagesByTrack[track].end(); it++) {
for (std::map<unsigned long, DTSCPageData>::iterator it = nProxy.pagesByTrack[track].begin(); it != nProxy.pagesByTrack[track].end(); it++) {
if (it->first <= keyNum) {
pageNumber = it->first;
} else {
@ -457,13 +408,13 @@ namespace Mist {
VERYHIGH_MSG("Track %u, key %u is already buffered in page %d. Cancelling bufferFrame", track, keyNum, pageNumber);
return true;
}
if (!pagesByTrack.count(track)) {
if (!nProxy.pagesByTrack.count(track)) {
WARN_MSG("No pages for track %u found! Cancelling bufferFrame", track);
return false;
}
//Update keynum to point to the corresponding page
INFO_MSG("Loading key %u from page %lu", keyNum, (--(pagesByTrack[track].upper_bound(keyNum)))->first);
keyNum = (--(pagesByTrack[track].upper_bound(keyNum)))->first;
INFO_MSG("Loading key %u from page %lu", keyNum, (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first);
keyNum = (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first;
if (!bufferStart(track, keyNum)) {
WARN_MSG("bufferStart failed! Cancelling bufferFrame");
return false;
@ -474,8 +425,8 @@ namespace Mist {
trackSelect(trackSpec.str());
seek(myMeta.tracks[track].keys[keyNum - 1].getTime());
long long unsigned int stopTime = myMeta.tracks[track].lastms + 1;
if ((int)myMeta.tracks[track].keys.size() > keyNum - 1 + pagesByTrack[track][keyNum].keyNum) {
stopTime = myMeta.tracks[track].keys[keyNum - 1 + pagesByTrack[track][keyNum].keyNum].getTime();
if ((int)myMeta.tracks[track].keys.size() > keyNum - 1 + nProxy.pagesByTrack[track][keyNum].keyNum) {
stopTime = myMeta.tracks[track].keys[keyNum - 1 + nProxy.pagesByTrack[track][keyNum].keyNum].getTime();
}
DEBUG_MSG(DLVL_HIGH, "Playing from %llu to %llu", myMeta.tracks[track].keys[keyNum - 1].getTime(), stopTime);
getNext();

View file

@ -89,17 +89,34 @@ namespace Mist {
capa["optional"]["segmentsize"]["type"] = "uint";
capa["optional"]["segmentsize"]["default"] = 5000LL;
option.null();
option["arg"] = "integer";
option["arg"] = "string";
option["long"] = "udp-port";
option["short"] = "U";
option["help"] = "The UDP port on which to listen for TS Packets";
option["value"].append(0LL);
option["value"].append("");
config->addOption("udpport", option);
capa["optional"]["udpport"]["name"] = "TS/UDP port";
capa["optional"]["udpport"]["help"] = "The UDP port on which to listen for TS Packets, or 0 for disabling TS Input";
capa["optional"]["udpport"]["help"] = "The UDP port on which to listen for TS Packets, or 0 for disabling TS Input, optionally prefixed with the interface IP to listen on.";
capa["optional"]["udpport"]["option"] = "--udp-port";
capa["optional"]["udpport"]["type"] = "uint";
capa["optional"]["udpport"]["default"] = 0LL;
capa["optional"]["udpport"]["type"] = "str";
capa["optional"]["udpport"]["default"] = "";
option.null();
option["arg"] = "string";
option["long"] = "multicast-interface";
option["short"] = "M";
option["help"] = "The interface(s)s on which to listen for UDP Multicast packets, space separated.";
option["value"].append("");
config->addOption("multicastinterface", option);
capa["optional"]["multicastinterface"]["name"] = "TS Multicast interface";
capa["optional"]["multicastinterface"]["help"] = "The interface(s) on which to listen for UDP Multicast packets, comma separated.";
capa["optional"]["multicastinterface"]["option"] = "--multicast-interface";
capa["optional"]["multicastinterface"]["type"] = "str";
capa["optional"]["multicastinterface"]["default"] = "";
option.null();
/*LTS-end*/
capa["source_match"] = "push://*";
capa["priority"] = 9ll;
@ -130,12 +147,12 @@ namespace Mist {
DEBUG_MSG(DLVL_DEVEL, "Cleaning up, removing last keyframes");
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[it->first];
if (!metaPages.count(it->first) || !metaPages[it->first].mapped){
if (!nProxy.metaPages.count(it->first) || !nProxy.metaPages[it->first].mapped){
continue;
}
//First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8){
int * tmpOffset = (int *)(metaPages[it->first].mapped + i);
int * tmpOffset = (int *)(nProxy.metaPages[it->first].mapped + i);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0){
continue;
}
@ -206,14 +223,14 @@ namespace Mist {
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore liveMeta(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.wait();
if (!metaPages.count(0) || !metaPages[0].mapped){
if (!nProxy.metaPages.count(0) || !nProxy.metaPages[0].mapped){
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
metaPages[0].init(pageName, DEFAULT_META_PAGE_SIZE, true);
metaPages[0].master = false;
nProxy.metaPages[0].init(pageName, DEFAULT_META_PAGE_SIZE, true);
nProxy.metaPages[0].master = false;
}
myMeta.writeTo(metaPages[0].mapped);
memset(metaPages[0].mapped + myMeta.getSendLen(), 0, (metaPages[0].len > myMeta.getSendLen() ? std::min(metaPages[0].len - myMeta.getSendLen(), 4ll) : 0));
myMeta.writeTo(nProxy.metaPages[0].mapped);
memset(nProxy.metaPages[0].mapped + myMeta.getSendLen(), 0, (nProxy.metaPages[0].len > myMeta.getSendLen() ? std::min(nProxy.metaPages[0].len - myMeta.getSendLen(), 4ll) : 0));
liveMeta.post();
}
@ -296,15 +313,15 @@ namespace Mist {
if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){
//Find page in indexpage and null it
for (int i = 0; i < 8192; i += 8){
unsigned int thisKeyNum = ((((long long int *)(metaPages[tid].mapped + i))[0]) >> 32) & 0xFFFFFFFF;
if (thisKeyNum == htonl(bufferLocations[tid].begin()->first) && ((((long long int *)(metaPages[tid].mapped + i))[0]) != 0)){
(((long long int *)(metaPages[tid].mapped + i))[0]) = 0;
unsigned int thisKeyNum = ((((long long int *)(nProxy.metaPages[tid].mapped + i))[0]) >> 32) & 0xFFFFFFFF;
if (thisKeyNum == htonl(bufferLocations[tid].begin()->first) && ((((long long int *)(nProxy.metaPages[tid].mapped + i))[0]) != 0)){
(((long long int *)(nProxy.metaPages[tid].mapped + i))[0]) = 0;
}
}
DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1);
bufferRemove(tid, bufferLocations[tid].begin()->first);
for (int i = 0; i < 1024; i++){
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
int * tmpOffset = (int *)(nProxy.metaPages[tid].mapped + (i * 8));
int tmpNum = ntohl(tmpOffset[0]);
if (tmpNum == bufferLocations[tid].begin()->first){
tmpOffset[0] = 0;
@ -312,12 +329,12 @@ namespace Mist {
}
}
curPageNum.erase(tid);
nProxy.curPageNum.erase(tid);
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), (unsigned long)tid, bufferLocations[tid].begin()->first);
curPage[tid].init(thisPageName, 20971520);
curPage[tid].master = true;
curPage.erase(tid);
nProxy.curPage[tid].init(thisPageName, 20971520);
nProxy.curPage[tid].master = true;
nProxy.curPage.erase(tid);
bufferLocations[tid].erase(bufferLocations[tid].begin());
} else {
@ -334,13 +351,13 @@ namespace Mist {
for (std::map<unsigned long, DTSCPageData>::iterator it = bufferLocations[tid].begin(); it != bufferLocations[tid].end(); it++){
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tid, it->first);
curPage[tid].init(thisPageName, 20971520, false, false);
curPage[tid].master = true;
curPage.erase(tid);
nProxy.curPage[tid].init(thisPageName, 20971520, false, false);
nProxy.curPage[tid].master = true;
nProxy.curPage.erase(tid);
}
bufferLocations.erase(tid);
metaPages[tid].master = true;
metaPages.erase(tid);
nProxy.metaPages[tid].master = true;
nProxy.metaPages.erase(tid);
}
void inputBuffer::finish() {
@ -410,9 +427,9 @@ namespace Mist {
while (bufferLocations[tid].size()){
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), (unsigned long)tid, bufferLocations[tid].begin()->first);
curPage[tid].init(thisPageName, 20971520);
curPage[tid].master = true;
curPage.erase(tid);
nProxy.curPage[tid].init(thisPageName, 20971520);
nProxy.curPage[tid].master = true;
nProxy.curPage.erase(tid);
bufferLocations[tid].erase(bufferLocations[tid].begin());
}
if (pushLocation.count(it->first)){
@ -427,9 +444,9 @@ namespace Mist {
}
pushLocation.erase(it->first);
}
curPageNum.erase(it->first);
metaPages[it->first].master = true;
metaPages.erase(it->first);
nProxy.curPageNum.erase(it->first);
nProxy.metaPages[it->first].master = true;
nProxy.metaPages.erase(it->first);
activeTracks.erase(it->first);
myMeta.tracks.erase(it->first);
changed = true;
@ -522,8 +539,8 @@ namespace Mist {
activeTracks.erase(value);
bufferLocations.erase(value);
}
metaPages[value].master = true;
metaPages.erase(value);
nProxy.metaPages[value].master = true;
nProxy.metaPages.erase(value);
continue;
}
}
@ -546,13 +563,13 @@ namespace Mist {
//The track id is set to the value of a track that we are currently negotiating about
if (negotiatingTracks.count(value)){
//If the metadata page for this track is not yet registered, initialize it
if (!metaPages.count(value) || !metaPages[value].mapped){
if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped){
char tempMetaName[NAME_BUFFER_SIZE];
snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), value);
metaPages[value].init(tempMetaName, 8388608, false, false);
nProxy.metaPages[value].init(tempMetaName, 8388608, false, false);
}
//If this tracks metdata page is not initialize, skip the entire element for now. It will be instantiated later
if (!metaPages[value].mapped) {
if (!nProxy.metaPages[value].mapped) {
//remove the negotiation if it has timed out
if (++negotiationTimeout[value] >= 1000){
negotiatingTracks.erase(value);
@ -564,13 +581,13 @@ namespace Mist {
//The page exist, now we try to read in the metadata of the track
//Store the size of the dtsc packet to read.
unsigned int len = ntohl(((int *)metaPages[value].mapped)[1]);
unsigned int len = ntohl(((int *)nProxy.metaPages[value].mapped)[1]);
//Temporary variable, won't be used again
unsigned int tempForReadingMeta = 0;
//Read in the metadata through a temporary JSON object
///\todo Optimize this part. Find a way to not have to store the metadata in JSON first, but read it from the page immediately
JSON::Value tempJSONForMeta;
JSON::fromDTMI((const unsigned char *)metaPages[value].mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
JSON::fromDTMI((const unsigned char *)nProxy.metaPages[value].mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
//Construct a metadata object for the current track
DTSC::Meta trackMeta(tempJSONForMeta);
//If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call.
@ -579,8 +596,8 @@ namespace Mist {
if (++negotiationTimeout[value] >= 1000){
negotiatingTracks.erase(value);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
metaPages[value].master = true;
metaPages.erase(value);
nProxy.metaPages[value].master = true;
nProxy.metaPages.erase(value);
negotiationTimeout.erase(value);
}
continue;
@ -603,8 +620,8 @@ namespace Mist {
//Remove the "negotiate" status in either case
negotiatingTracks.erase(value);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
metaPages[value].master = true;
metaPages.erase(value);
nProxy.metaPages[value].master = true;
nProxy.metaPages.erase(value);
//Check if the track collides, and whether the track it collides with is active.
if (collidesWith != -1 && activeTracks.count(collidesWith)){/*LTS*/
@ -639,8 +656,8 @@ namespace Mist {
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
updateMeta();
eraseTrackDataPages(value);
metaPages[finalMap].master = true;
metaPages.erase(finalMap);
nProxy.metaPages[finalMap].master = true;
nProxy.metaPages.erase(finalMap);
bufferLocations.erase(finalMap);
}
@ -666,12 +683,12 @@ namespace Mist {
//If the track is active, and this is the element responsible for pushing it
if (activeTracks.count(value) && pushLocation[value] == data){
//Open the track index page if we dont have it open yet
if (!metaPages.count(value) || !metaPages[value].mapped){
if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped){
char firstPage[NAME_BUFFER_SIZE];
snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), value);
metaPages[value].init(firstPage, 8192, false, false);
nProxy.metaPages[value].init(firstPage, 8192, false, false);
}
if (metaPages[value].mapped){
if (nProxy.metaPages[value].mapped){
//Update the metadata for this track
updateTrackMeta(value);
hasPush = true;
@ -684,7 +701,7 @@ namespace Mist {
VERYHIGH_MSG("Updating meta for track %d", tNum);
//Store a reference for easier access
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[tNum];
char * mappedPointer = metaPages[tNum].mapped;
char * mappedPointer = nProxy.metaPages[tNum].mapped;
//First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8) {
@ -725,27 +742,27 @@ namespace Mist {
//Otherwise open and parse the page
//Open the page if it is not yet open
if (!curPageNum.count(tNum) || curPageNum[tNum] != pageNum || !curPage[tNum].mapped){
if (!nProxy.curPageNum.count(tNum) || nProxy.curPageNum[tNum] != pageNum || !nProxy.curPage[tNum].mapped){
//DO NOT ERASE THE PAGE HERE, master is not set to true
curPageNum.erase(tNum);
nProxy.curPageNum.erase(tNum);
char nextPageName[NAME_BUFFER_SIZE];
snprintf(nextPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tNum, pageNum);
curPage[tNum].init(nextPageName, 20971520);
nProxy.curPage[tNum].init(nextPageName, 20971520);
//If the page can not be opened, stop here
if (!curPage[tNum].mapped){
if (!nProxy.curPage[tNum].mapped){
WARN_MSG("Could not open page: %s", nextPageName);
return;
}
curPageNum[tNum] = pageNum;
nProxy.curPageNum[tNum] = pageNum;
}
DTSC::Packet tmpPack;
if (!curPage[tNum].mapped[pageData.curOffset]){
if (!nProxy.curPage[tNum].mapped[pageData.curOffset]){
VERYHIGH_MSG("No packet on page %lu for track %lu, waiting...", pageNum, tNum);
return;
}
tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0);
tmpPack.reInit(nProxy.curPage[tNum].mapped + pageData.curOffset, 0);
//No new data has been written on the page since last update
if (!tmpPack){
return;
@ -761,7 +778,7 @@ namespace Mist {
//Update the offset on the page with the size of the current packet
pageData.curOffset += tmpPack.getDataLen();
//Attempt to read in the next packet
tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0);
tmpPack.reInit(nProxy.curPage[tNum].mapped + pageData.curOffset, 0);
}
}

View file

@ -10,9 +10,16 @@
#include <mist/flv_tag.h>
#include <mist/defines.h>
#include <mist/ts_packet.h>
#include <mist/timing.h>
#include <mist/mp4_generic.h>
#include "input_ts.h"
#include <mist/tinythread.h>
#include <sys/stat.h>
/// \todo Implement this trigger equivalent...
/*
if(Triggers::shouldTrigger("STREAM_PUSH", smp)){
@ -27,10 +34,82 @@ if(Triggers::shouldTrigger("STREAM_PUSH", smp)){
}
*/
#ifdef TSLIVE_INPUT
std::string globalStreamName;
TS::Stream liveStream(true);
Util::Config * cfgPointer = NULL;
#define THREAD_TIMEOUT 15
std::map<unsigned long long, unsigned long long> threadTimer;
std::set<unsigned long> claimableThreads;
void parseThread(void * ignored) {
std::string semName = "MstInTSStreamClaim" + globalStreamName;
IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
int tid = -1;
lock.wait();
if (claimableThreads.size()) {
tid = *claimableThreads.begin();
claimableThreads.erase(claimableThreads.begin());
}
lock.post();
if (tid == -1) {
return;
}
if (liveStream.isDataTrack(tid)){
if (!Util::startInput(globalStreamName)) {
return;
}
}
Mist::negotiationProxy myProxy;
myProxy.streamName = globalStreamName;
DTSC::Meta myMeta;
if (liveStream.isDataTrack(tid)){
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, globalStreamName.c_str());
myProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
}
threadTimer[tid] = Util::bootSecs();
while (Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT && cfgPointer->is_active) {
liveStream.parse(tid);
if (liveStream.hasPacket(tid)){
liveStream.initializeMetadata(myMeta, tid);
DTSC::Packet pack;
liveStream.getPacket(tid, pack);
if (pack && myMeta.tracks.count(tid)){
myProxy.bufferLivePacket(pack, myMeta);
}
lock.wait();
threadTimer[tid] = Util::bootSecs();
lock.post();
}
if (liveStream.isDataTrack(tid)){
myProxy.userClient.keepAlive();
}
if (!liveStream.hasPacket(tid)){
Util::sleep(100);
}
}
lock.wait();
threadTimer.erase(tid);
lock.post();
liveStream.eraseTrack(tid);
myProxy.userClient.finish();
}
#endif
namespace Mist {
/// Constructor of TS Input
/// \arg cfg Util::Config that contains all current configurations.
inputTS::inputTS(Util::Config * cfg) : Input(cfg) {
@ -44,44 +123,80 @@ namespace Mist {
capa["codecs"][0u][1u].append("AC3");
capa["optional"]["port"]["name"] = "UDP Port";
capa["optional"]["port"]["help"] = "The udp port on which to listen for incoming UDP Packets";
capa["optional"]["port"]["type"] = "uint";
capa["optional"]["port"]["default"] = 9876;
capa["optional"]["port"]["help"] = "The UDP port on which to listen for incoming UDP Packets, optionally prefixed by the interface IP.";
capa["optional"]["port"]["type"] = "string";
capa["optional"]["port"]["default"] = "9876";
capa["optional"]["port"]["option"] = "--port";
cfg->addOption("port",
JSON::fromString("{\"arg\":\"integer\",\"value\":9876,\"short\":\"p\",\"long\":\"port\",\"help\":\"The udp port on which to listen for incoming UDP Packets.\"}"));
JSON::fromString("{\"arg\":\"string\",\"value\":9876,\"short\":\"p\",\"long\":\"port\",\"help\":\"The UDP port on which to listen for incoming UDP Packets, optionally prefixed by the interface IP.\"}"));
capa["optional"]["multicastinterface"]["name"] = "TS Multicast interface";
capa["optional"]["multicastinterface"]["help"] = "The interface(s) on which to listen for UDP Multicast packets, comma separated.";
capa["optional"]["multicastinterface"]["option"] = "--multicast-interface";
capa["optional"]["multicastinterface"]["type"] = "str";
capa["optional"]["multicastinterface"]["default"] = "";
cfg->addOption("multicastinterface",
JSON::fromString("{\"arg\":\"string\",\"value\":\"\",\"short\":\"M\",\"long\":\"multicast-interface\",\"help\":\"The interfaces on which to listen for UDP Multicast packets, space separatered.\"}"));
pushing = false;
inFile = NULL;
#ifdef TSLIVE_INPUT
standAlone = false;
#endif
}
inputTS::~inputTS() {
if (inFile){
if (inFile) {
fclose(inFile);
}
#ifdef TSLIVE_INPUT
std::string semName = "MstInTSStreamClaim" + globalStreamName;
IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
lock.wait();
threadTimer.clear();
claimableThreads.clear();
lock.post();
#endif
}
///Setup of TS Input
#ifdef TSLIVE_INPUT
///Live Setup of TS Input
bool inputTS::setup() {
#ifdef INPUT_LIVE
INFO_MSG("Setup start");
if (config->getString("input") == "-") {
inFile = stdin;
}else{
} else {
pushing = true;
udpCon.setBlocking(false);
udpCon.bind(config->getInteger("port"));
std::string ipPort = config->getString("port");
size_t colon = ipPort.rfind(':');
if (colon != std::string::npos) {
udpCon.bind(JSON::Value(ipPort.substr(colon + 1)).asInt(), ipPort.substr(0, colon), config->getString("multicastinterface"));
} else {
udpCon.bind(JSON::Value(ipPort).asInt(), "", config->getString("multicastinterface"));
}
}
INFO_MSG("Setup complete");
return true;
}
#else
if (config->getString("input") != "-"){
///Setup of TS Input
bool inputTS::setup() {
if (config->getString("input") != "-") {
inFile = fopen(config->getString("input").c_str(), "r");
}
if (!inFile) {
return false;
}
#endif
return true;
}
#endif
///Track selector of TS Input
///\arg trackSpec specifies which tracks are to be selected
///\todo test whether selecting a subset of tracks work
@ -99,33 +214,40 @@ namespace Mist {
}
}
#ifdef TSLIVE_INPUT
//This implementation in used in the live version of TS input, where no header is available in advance.
//Reading the header returns true in this case, to continue parsing the actual stream.
bool inputTS::readHeader() {
return true;
}
#else
///Reads headers from a TS stream, and saves them into metadata
///It works by going through the entire TS stream, and every time
///It encounters a new PES start, it writes the currently found PES data
///for a specific track to metadata. After the entire stream has been read,
///for a specific track to metadata. After the entire stream has been read,
///it writes the remaining metadata.
///\todo Find errors, perhaps parts can be made more modular
bool inputTS::readHeader(){
bool inputTS::readHeader() {
if (!inFile) {
return false;
}
DTSC::File tmp(config->getString("input") + ".dtsh");
if (tmp){
if (tmp) {
myMeta = tmp.getMeta();
return true;
}
}
TS::Packet packet;//to analyse and extract data
fseek(inFile, 0, SEEK_SET);//seek to beginning
bool first = true;
long long int lastBpos = 0;
while (packet.FromFile(inFile) && !feof(inFile)){
while (packet.FromFile(inFile) && !feof(inFile)) {
tsStream.parse(packet, lastBpos);
lastBpos = ftell(inFile);
while(tsStream.hasPacketOnEachTrack()){
if (first){
while (tsStream.hasPacketOnEachTrack()) {
if (first) {
tsStream.initializeMetadata(myMeta);
first = false;
}
@ -142,106 +264,176 @@ namespace Mist {
oFile.close();
return true;
}
#endif
///Gets the next packet that is to be sent
///At the moment, the logic of sending the last packet that was finished has been implemented,
///Gets the next packet that is to be sent
///At the moment, the logic of sending the last packet that was finished has been implemented,
///but the seeking and finding data is not yet ready.
///\todo Finish the implementation
void inputTS::getNext(bool smart){
void inputTS::getNext(bool smart) {
INSANE_MSG("Getting next");
thisPacket.null();
bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
while (!hasPacket && (pushing || !feof(inFile)) && config->is_active){
while (!hasPacket && (pushing || !feof(inFile)) && config->is_active) {
if (!pushing) {
unsigned int bPos = ftell(inFile);
tsBuf.FromFile(inFile);
if (selectedTracks.count(tsBuf.getPID())){
if (selectedTracks.count(tsBuf.getPID())) {
tsStream.parse(tsBuf, bPos);
}
}else{
while (udpCon.Receive()){
} else {
while (udpCon.Receive()) {
udpDataBuffer.append(udpCon.data, udpCon.data_len);
while (udpDataBuffer.size() > 188 && (udpDataBuffer[0] != 0x47 || udpDataBuffer[188] != 0x47)){
while (udpDataBuffer.size() > 188 && (udpDataBuffer[0] != 0x47 || udpDataBuffer[188] != 0x47)) {
size_t syncPos = udpDataBuffer.find("\107", 1);
udpDataBuffer.erase(0, syncPos);
}
while (udpDataBuffer.size() >= 188){
while (udpDataBuffer.size() >= 188) {
tsBuf.FromPointer(udpDataBuffer.data());
tsStream.parse(tsBuf, 0);
udpDataBuffer.erase(0,188);
udpDataBuffer.erase(0, 188);
}
}
if (userClient.getData()){
userClient.keepAlive();
}
Util::sleep(500);
}
if (userClient.getData()){
userClient.keepAlive();
}
hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
}
if (!hasPacket){
if(inFile && !feof(inFile)){
if (!hasPacket) {
if (inFile && !feof(inFile)) {
getNext();
}
if (pushing){
if (pushing) {
sleep(500);
}
return;
}
if (selectedTracks.size() == 1){
if (selectedTracks.size() == 1) {
tsStream.getPacket(*selectedTracks.begin(), thisPacket);
}else{
} else {
tsStream.getEarliestPacket(thisPacket);
}
tsStream.initializeMetadata(myMeta);
if (!myMeta.tracks.count(thisPacket.getTrackId())){
if (!myMeta.tracks.count(thisPacket.getTrackId())) {
getNext();
}
}
void inputTS::readPMT(){
void inputTS::readPMT() {
//save current file position
int bpos = ftell(inFile);
if (fseek(inFile, 0, SEEK_SET)){
if (fseek(inFile, 0, SEEK_SET)) {
FAIL_MSG("Seek to 0 failed");
return;
}
TS::Packet tsBuffer;
while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromFile(inFile)){
while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromFile(inFile)) {
tsStream.parse(tsBuffer, 0);
}
//Clear leaves the PMT in place
tsStream.clear();
//Restore original file position
if (fseek(inFile, bpos, SEEK_SET)){
if (fseek(inFile, bpos, SEEK_SET)) {
return;
}
}
///Seeks to a specific time
void inputTS::seek(int seekTime){
void inputTS::seek(int seekTime) {
tsStream.clear();
readPMT();
unsigned long seekPos = 0xFFFFFFFFull;
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) {
unsigned long thisBPos = 0;
for (std::deque<DTSC::Key>::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++){
if (keyIt->getTime() > seekTime){
for (std::deque<DTSC::Key>::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++) {
if (keyIt->getTime() > seekTime) {
break;
}
thisBPos = keyIt->getBpos();
}
if (thisBPos < seekPos){
if (thisBPos < seekPos) {
seekPos = thisBPos;
}
}
fseek(inFile, seekPos, SEEK_SET);//seek to the correct position
}
#ifdef TSLIVE_INPUT
void inputTS::serve() {
cfgPointer = config;
globalStreamName = streamName;
unsigned long long threadCheckTimer = Util::bootSecs();
while (config->is_active) {
if (!pushing) {
unsigned int bPos = ftell(inFile);
int ctr = 0;
while (ctr < 20 && tsBuf.FromFile(inFile)){
liveStream.add(tsBuf);
ctr++;
}
} else {
while (udpCon.Receive()) {
int offset = 0;
//Try to read full TS Packets
//Assumption here is made that one UDP Datagram consists of complete TS Packets.
//Assumption made because of possible packet loss issues
while ((udpCon.data_len - offset) >= 188) {
//Watch out! We push here to a global, in order for threads to be able to access it.
liveStream.add(udpCon.data + offset);
offset += 188;
}
if (offset < udpCon.data_len) {
WARN_MSG("%d bytes left in datagram", udpCon.data_len - offset);
}
}
}
//Check for and spawn threads here.
if (Util::bootSecs() - threadCheckTimer > 2) {
std::set<unsigned long> activeTracks = liveStream.getActiveTracks();
std::string semName = "MstInTSStreamClaim" + globalStreamName;
IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
lock.wait();
for (std::set<unsigned long>::iterator it = activeTracks.begin(); it != activeTracks.end(); it++) {
if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))) {
WARN_MSG("Thread for track %d timed out %d seconds ago without a clean shutdown.", *it, Util::bootSecs() - threadTimer[*it]);
threadTimer.erase(*it);
}
if (!threadTimer.count(*it)) {
//Add to list of unclaimed threads
claimableThreads.insert(*it);
//Spawn thread here.
tthread::thread thisThread(parseThread, 0);
thisThread.detach();
}
}
lock.post();
threadCheckTimer = Util::bootSecs();
}
Util::sleep(100);
}
finish();
INFO_MSG("Input for stream %s closing clean", streamName.c_str());
}
void inputTS::finish() {
std::string semName = "MstInTSStreamClaim" + globalStreamName;
IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
int threadCount = 0;
do {
lock.wait();
threadCount = threadTimer.size();
lock.post();
} while (threadCount);
}
#endif
}

View file

@ -22,6 +22,17 @@ namespace Mist {
void trackSelect(std::string trackSpec);
void readPMT();
#ifdef TSLIVE_INPUT
//Live tsinput does not have a header, so parseheader should do nothing
void parseHeader() { }
//In case of live TS Input, we override the default serve function
void serve();
void finish();
#endif
FILE * inFile;///<The input file with ts data
TS::Stream tsStream;///<Used for parsing the incoming ts stream

View file

@ -16,17 +16,17 @@ namespace Mist {
//Open the page for the metadata
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
metaPages[0].init(pageName, myMeta.getSendLen(), true);
nProxy.metaPages[0].init(pageName, myMeta.getSendLen(), true);
//Make sure we don't delete it on accident
metaPages[0].master = false;
nProxy.metaPages[0].master = false;
//Write the metadata to the page
myMeta.writeTo(metaPages[0].mapped);
myMeta.writeTo(nProxy.metaPages[0].mapped);
}
/*LTS-START*/
void InOutBase::initiateEncryption(){
void negotiationProxy::initiateEncryption(){
static bool encInit = false;
if (encInit){
return;
@ -58,6 +58,24 @@ namespace Mist {
}
/*LTS-END*/
bool InOutBase::bufferStart(unsigned long tid, unsigned long pageNumber) {
VERYHIGH_MSG("bufferStart for stream %s, track %lu, page %lu", streamName.c_str(), tid, pageNumber);
//Initialize the stream metadata if it does not yet exist
#ifndef TSLIVE_INPUT
if (!nProxy.metaPages.count(0)) {
initiateMeta();
}
#endif
//If we are a stand-alone player skip track negotiation, as there will be nothing to negotiate with.
if (standAlone) {
if (!nProxy.trackMap.count(tid)) {
nProxy.trackMap[tid] = tid;
}
}
return nProxy.bufferStart(tid, pageNumber, myMeta);
}
///Starts the buffering of a new page.
///
///Does not do any actual buffering, just sets the right bits for buffering to go right.
@ -65,23 +83,10 @@ namespace Mist {
///Buffering itself is done by bufferNext().
///\param tid The trackid of the page to start buffering
///\param pageNumber The number of the page to start buffering
bool InOutBase::bufferStart(unsigned long tid, unsigned long pageNumber) {
VERYHIGH_MSG("bufferStart for stream %s, track %lu, page %lu", streamName.c_str(), tid, pageNumber);
bool negotiationProxy::bufferStart(unsigned long tid, unsigned long pageNumber, DTSC::Meta & myMeta) {
initiateEncryption();
//Initialize the stream metadata if it does not yet exist
#ifndef INPUT_LIVE
if (!metaPages.count(0)) {
initiateMeta();
}
#endif
//If we are a stand-alone player skip track negotiation, as there will be nothing to negotiate with.
if (standAlone) {
if (!trackMap.count(tid)) {
trackMap[tid] = tid;
}
}
//Negotiate the requested track if needed.
continueNegotiate(tid);
continueNegotiate(tid, myMeta);
//If the negotation state for this track is not 'Accepted', stop buffering this page, maybe try again later.
if (trackState[tid] != FILL_ACC) {
@ -142,6 +147,9 @@ namespace Mist {
//Initialize the bookkeeping entry, and set the current offset to 0, to allow for using it in bufferNext()
pagesByTrack[tid][pageNumber].curOffset = 0;
HIGH_MSG("Start buffering page %lu on track %lu~>%lu successful", pageNumber, tid, mapTid);
if (myMeta.live){
//Register this page on the meta page
//NOTE: It is important that this only happens if the stream is live....
@ -150,7 +158,7 @@ namespace Mist {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
if ((tmpOffset[0] == 0 && tmpOffset[1] == 0)) {
tmpOffset[0] = htonl(curPageNum[tid]);
if (pagesByTrack[tid][pageNumber].dataSize == (25 * 1024 * 1024)){
if (pagesByTrack[tid][pageNumber].dataSize == DEFAULT_DATA_PAGE_SIZE){
tmpOffset[1] = htonl(1000);
} else {
tmpOffset[1] = htonl(pagesByTrack[tid][pageNumber].keyNum);
@ -160,8 +168,8 @@ namespace Mist {
}
}
}
HIGH_MSG("Start buffering page %lu on track %lu~>%lu successful", pageNumber, tid, mapTid);
///\return true if everything was successful
return true;
}
@ -176,13 +184,13 @@ namespace Mist {
//A different process will handle this for us
return;
}
unsigned long mapTid = trackMap[tid];
if (!pagesByTrack.count(tid)){
unsigned long mapTid = nProxy.trackMap[tid];
if (!nProxy.pagesByTrack.count(tid)){
// If there is no pagesByTrack entry, the pages are managed in local code and not through io.cpp (e.g.: MistInBuffer)
return;
}
//If the given pagenumber is not a valid page on this track, do nothing
if (!pagesByTrack[tid].count(pageNumber)){
if (!nProxy.pagesByTrack[tid].count(pageNumber)){
INFO_MSG("Can't remove page %lu on track %lu~>%lu as it is not a valid page number.", pageNumber, tid, mapTid);
return;
}
@ -194,7 +202,7 @@ namespace Mist {
#ifdef __CYGWIN__
toErase.init(pageName, 26 * 1024 * 1024, false);
#else
toErase.init(pageName, pagesByTrack[tid][pageNumber].dataSize, false);
toErase.init(pageName, nProxy.pagesByTrack[tid][pageNumber].dataSize, false);
#endif
//Set the master flag so that the page will be destroyed once it leaves scope
#if defined(__CYGWIN__) || defined(_WIN32)
@ -204,7 +212,7 @@ namespace Mist {
//Remove the page from the tracks index page
DEBUG_MSG(DLVL_HIGH, "Removing page %lu on track %lu~>%lu from the corresponding metaPage", pageNumber, tid, mapTid);
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
int * tmpOffset = (int *)(nProxy.metaPages[tid].mapped + (i * 8));
if (ntohl(tmpOffset[0]) == pageNumber) {
tmpOffset[0] = 0;
tmpOffset[1] = 0;
@ -217,7 +225,7 @@ namespace Mist {
///Checks whether a key is buffered
///\param tid The trackid on which to locate the key
///\param keyNum The number of the keyframe to find
bool InOutBase::isBuffered(unsigned long tid, unsigned long keyNum) {
bool negotiationProxy::isBuffered(unsigned long tid, unsigned long keyNum) {
///\return The result of bufferedOnPage(tid, keyNum)
return bufferedOnPage(tid, keyNum);
}
@ -225,7 +233,7 @@ namespace Mist {
///Returns the pagenumber where this key is buffered on
///\param tid The trackid on which to locate the key
///\param keyNum The number of the keyframe to find
unsigned long InOutBase::bufferedOnPage(unsigned long tid, unsigned long keyNum) {
unsigned long negotiationProxy::bufferedOnPage(unsigned long tid, unsigned long keyNum) {
//Check whether the track is accepted
if (!trackMap.count(tid) || !metaPages.count(tid) || !metaPages[tid].mapped) {
///\return 0 if the page has not been mapped yet
@ -252,12 +260,16 @@ namespace Mist {
std::string packData = pack.toNetPacked();
DTSC::Packet newPack(packData.data(), packData.size());
///\note Internally calls bufferNext(DTSC::Packet & pack)
bufferNext(newPack);
nProxy.bufferNext(newPack, myMeta);
}
///Buffers the next packet on the currently opened page
///\param pack The packet to buffer
void InOutBase::bufferNext(DTSC::Packet & pack) {
nProxy.bufferNext(pack, myMeta);
}
void negotiationProxy::bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta) {
//Save the trackid of the track for easier access
unsigned long tid = pack.getTrackId();
unsigned long mapTid = trackMap[tid];
@ -330,6 +342,10 @@ namespace Mist {
///Registers the data page on the track index page as well
///\param tid The trackid of the page to finalize
void InOutBase::bufferFinalize(unsigned long tid) {
nProxy.bufferFinalize(tid, myMeta);
}
void negotiationProxy::bufferFinalize(unsigned long tid, DTSC::Meta & myMeta){
unsigned long mapTid = trackMap[tid];
//If no page is open, do nothing
if (!curPage.count(tid)) {
@ -411,6 +427,10 @@ namespace Mist {
///Initiates/continues negotiation with the buffer as well
///\param packet The packet to buffer
void InOutBase::bufferLivePacket(DTSC::Packet & packet){
nProxy.bufferLivePacket(packet, myMeta);
}
void negotiationProxy::bufferLivePacket(DTSC::Packet & packet, DTSC::Meta & myMeta){
myMeta.vod = false;
myMeta.live = true;
//Store the trackid for easier access
@ -422,7 +442,7 @@ namespace Mist {
}
//If the track is not negotiated yet, start the negotiation
if (!trackState.count(tid)) {
continueNegotiate(tid);
continueNegotiate(tid, myMeta);
}
//If the track is declined, stop here
if (trackState[tid] == FILL_DEC) {
@ -443,7 +463,7 @@ namespace Mist {
if (shouldBlock) {
while (trackState[tid] != FILL_DEC && trackState[tid] != FILL_ACC) {
INFO_MSG("Blocking on track %lu", tid);
continueNegotiate(tid);
continueNegotiate(tid, myMeta);
Util::sleep(500);
}
}
@ -504,16 +524,19 @@ namespace Mist {
if (!curPageNum.count(tid) || nextPageNum != curPageNum[tid]) {
if (curPageNum.count(tid)) {
//Close the currently opened page when it exists
bufferFinalize(tid);
bufferFinalize(tid, myMeta);
}
//Open the new page
bufferStart(tid, nextPageNum);
bufferStart(tid, nextPageNum, myMeta);
}
//Buffer the packet
bufferNext(packet);
bufferNext(packet, myMeta);
}
void InOutBase::continueNegotiate(unsigned long tid) {
nProxy.continueNegotiate(tid, myMeta);
}
void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta) {
if (!tid) {
return;
}
@ -559,7 +582,7 @@ namespace Mist {
if (!userClient.getData()){
char userPageName[100];
sprintf(userPageName, SHM_USERS, streamName.c_str());
userClient = IPC::sharedClient(userPageName, 30, true);
userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
}
char * tmp = userClient.getData();
if (!tmp) {

View file

@ -26,6 +26,42 @@ namespace Mist {
unsigned long lastKeyTime;///<The last key time encountered on this track.
};
class negotiationProxy {
public:
negotiationProxy() : encrypt(false) {}
void initiateEncryption();//LTS
bool bufferStart(unsigned long tid, unsigned long pageNumber, DTSC::Meta & myMeta);
void bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta);
void bufferFinalize(unsigned long tid, DTSC::Meta &myMeta);
void bufferLivePacket(DTSC::Packet & packet, DTSC::Meta & myMeta);
bool isBuffered(unsigned long tid, unsigned long keyNum);
unsigned long bufferedOnPage(unsigned long tid, unsigned long keyNum);
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > pagesByTrack;///<Holds the data for all pages of a track. Based on unmapped tids
//Negotiation stuff (from unmapped tid's)
std::map<unsigned long, unsigned long> trackOffset; ///< Offset of data on user page
std::map<unsigned long, negotiationState> trackState; ///< State of the negotiation for tracks
std::map<unsigned long, unsigned long> trackMap;///<Determines which input track maps onto which "final" track
std::map<unsigned long, IPC::sharedPage> metaPages;///< For each track, holds the page that describes which dataPages are mapped
std::map<unsigned long, unsigned long> curPageNum;///< For each track, holds the number page that is currently being written.
std::map<unsigned long, IPC::sharedPage> curPage;///< For each track, holds the page that is currently being written.
IPC::sharedClient userClient;///< Shared memory used for connection to Mixer process.
std::string streamName;///< Name of the stream to connect to
bool encrypt;
Encryption::verimatrixData vmData;
std::map<int,unsigned long long int> iVecs;
IPC::sharedPage encryptionPage;
void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta);
};
///\brief Class containing all basic input and output functions.
class InOutBase {
public:
@ -37,37 +73,23 @@ namespace Mist {
void bufferRemove(unsigned long tid, unsigned long pageNumber);
void bufferLivePacket(JSON::Value & packet);
void bufferLivePacket(DTSC::Packet & packet);
bool isBuffered(unsigned long tid, unsigned long keyNum);
unsigned long bufferedOnPage(unsigned long tid, unsigned long keyNum);
protected:
void continueNegotiate(unsigned long tid);
bool standAlone;
static Util::Config * config;
void initiateEncryption();//LTS
void continueNegotiate(unsigned long tid);
negotiationProxy nProxy;
DTSC::Packet thisPacket;//The current packet that is being parsed
std::string streamName;///< Name of the stream to connect to
IPC::sharedClient userClient;///< Shared memory used for connection to Mixer process.
std::string streamName;
DTSC::Meta myMeta;///< Stores either the input or output metadata
std::set<unsigned long> selectedTracks;///< Stores the track id's that are either selected for playback or input
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > pagesByTrack;///<Holds the data for all pages of a track. Based on unmapped tids
//Negotiation stuff (from unmapped tid's)
std::map<unsigned long, unsigned long> trackOffset; ///< Offset of data on user page
std::map<unsigned long, negotiationState> trackState; ///< State of the negotiation for tracks
std::map<unsigned long, unsigned long> trackMap;///<Determines which input track maps onto which "final" track
std::map<unsigned long, IPC::sharedPage> metaPages;///< For each track, holds the page that describes which dataPages are mapped
std::map<unsigned long, unsigned long> curPageNum;///< For each track, holds the number page that is currently being written.
std::map<unsigned long, IPC::sharedPage> curPage;///< For each track, holds the page that is currently being written.
std::map<unsigned long, std::deque<DTSC::Packet> > trackBuffer; ///< Buffer to be used during active track negotiation
bool encrypt;
Encryption::verimatrixData vmData;
std::map<int,unsigned long long int> iVecs;
IPC::sharedPage encryptionPage;
DTSC::Meta myMeta;///< Stores either the input or output metadata
};
}

View file

@ -38,6 +38,7 @@ int main(int argc, char * argv[]) {
std::cout << mistOut::capa.toString() << std::endl;
return -1;
}
conf.activate();
if (mistOut::listenMode()){
conf.serveForkedSocket(spawnForked);
}else{

View file

@ -163,8 +163,8 @@ namespace Mist {
if (lock){
liveMeta.wait();
}
if (metaPages[0].mapped){
DTSC::Packet tmpMeta(metaPages[0].mapped, metaPages[0].len, true);
if (nProxy.metaPages[0].mapped){
DTSC::Packet tmpMeta(nProxy.metaPages[0].mapped, nProxy.metaPages[0].len, true);
if (tmpMeta.getVersion()){
myMeta.reinit(tmpMeta);
}
@ -197,7 +197,7 @@ namespace Mist {
if (isInitialized){
return;
}
if (metaPages[0].mapped){
if (nProxy.metaPages[0].mapped){
return;
}
if (streamName.size() < 1){
@ -233,8 +233,8 @@ namespace Mist {
/// Connects or reconnects to the stream.
/// Assumes streamName class member has been set already.
/// Will start input if not currently active, calls onFail() if this does not succeed.
/// After assuring stream is online, clears metaPages, then sets metaPages[0], statsPage and userClient to (hopefully) valid handles.
/// Finally, calls updateMeta() and stats()
/// After assuring stream is online, clears nProxy.metaPages, then sets nProxy.metaPages[0], statsPage and nProxy.userClient to (hopefully) valid handles.
/// Finally, calls updateMeta()
void Output::reconnect(){
if (!Util::startInput(streamName)){
DEBUG_MSG(DLVL_FAIL, "Opening stream failed - aborting initalization");
@ -243,9 +243,9 @@ namespace Mist {
}
char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
metaPages.clear();
metaPages[0].init(pageId, DEFAULT_META_PAGE_SIZE);
if (!metaPages[0].mapped){
nProxy.metaPages.clear();
nProxy.metaPages[0].init(pageId, DEFAULT_META_PAGE_SIZE);
if (!nProxy.metaPages[0].mapped){
FAIL_MSG("Could not connect to server for %s", streamName.c_str());
onFail();
return;
@ -254,13 +254,12 @@ namespace Mist {
statsPage.finish();
}
statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true);
if (userClient.getData()){
userClient.finish();
if (nProxy.userClient.getData()){
nProxy.userClient.finish();
}
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
stats();
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
updateMeta();
}
@ -409,14 +408,14 @@ namespace Mist {
}
int Output::pageNumForKey(long unsigned int trackId, long long int keyNum){
if (!metaPages.count(trackId)){
if (!nProxy.metaPages.count(trackId)){
char id[NAME_BUFFER_SIZE];
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), trackId);
metaPages[trackId].init(id, 8 * 1024);
nProxy.metaPages[trackId].init(id, 8 * 1024);
}
int len = metaPages[trackId].len / 8;
int len = nProxy.metaPages[trackId].len / 8;
for (int i = 0; i < len; i++){
int * tmpOffset = (int *)(metaPages[trackId].mapped + (i * 8));
int * tmpOffset = (int *)(nProxy.metaPages[trackId].mapped + (i * 8));
long amountKey = ntohl(tmpOffset[1]);
if (amountKey == 0){continue;}
long tmpKey = ntohl(tmpOffset[0]);
@ -429,7 +428,7 @@ namespace Mist {
void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){
if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){
curPage.erase(trackId);
nProxy.curPage.erase(trackId);
currKeyOpen.erase(trackId);
return;
}
@ -448,7 +447,7 @@ namespace Mist {
}
if (timeout > 100){
DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page %lld for track %lu. Aborting.", keyNum, trackId);
curPage.erase(trackId);
nProxy.curPage.erase(trackId);
currKeyOpen.erase(trackId);
return;
}
@ -475,9 +474,9 @@ namespace Mist {
}
char id[NAME_BUFFER_SIZE];
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackId, pageNum);
curPage[trackId].init(id, DEFAULT_DATA_PAGE_SIZE);
if (!(curPage[trackId].mapped)){
DEBUG_MSG(DLVL_FAIL, "Initializing page %s failed", curPage[trackId].name.c_str());
nProxy.curPage[trackId].init(id, DEFAULT_DATA_PAGE_SIZE);
if (!(nProxy.curPage[trackId].mapped)){
DEBUG_MSG(DLVL_FAIL, "Initializing page %s failed", nProxy.curPage[trackId].name.c_str());
return;
}
currKeyOpen[trackId] = pageNum;
@ -503,7 +502,7 @@ namespace Mist {
bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){
loadPageForKey(tid, getKeyForTime(tid, pos) + (getNextKey?1:0));
if (!curPage.count(tid) || !curPage[tid].mapped){
if (!nProxy.curPage.count(tid) || !nProxy.curPage[tid].mapped){
INFO_MSG("Aborting seek to %llums in track %u, not available.", pos, tid);
return false;
}
@ -511,9 +510,9 @@ namespace Mist {
tmp.tid = tid;
tmp.offset = 0;
DTSC::Packet tmpPack;
tmpPack.reInit(curPage[tid].mapped + tmp.offset, 0, true);
tmpPack.reInit(nProxy.curPage[tid].mapped + tmp.offset, 0, true);
tmp.time = tmpPack.getTime();
char * mpd = curPage[tid].mapped;
char * mpd = nProxy.curPage[tid].mapped;
while ((long long)tmp.time < pos && tmpPack){
tmp.offset += tmpPack.getDataLen();
tmpPack.reInit(mpd + tmp.offset, 0, true);
@ -524,15 +523,15 @@ namespace Mist {
return true;
}else{
//don't print anything for empty packets - not sign of corruption, just unfinished stream.
if (curPage[tid].mapped[tmp.offset] != 0){
if (nProxy.curPage[tid].mapped[tmp.offset] != 0){
DEBUG_MSG(DLVL_FAIL, "Noes! Couldn't find packet on track %d because of some kind of corruption error or somesuch.", tid);
}else{
VERYHIGH_MSG("Track %d no data (key %u @ %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset);
unsigned int i = 0;
while (curPage[tid].mapped[tmp.offset] == 0 && ++i < 42){
while (nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i < 42){
Util::wait(100);
}
if (curPage[tid].mapped[tmp.offset] == 0){
if (nProxy.curPage[tid].mapped[tmp.offset] == 0){
FAIL_MSG("Track %d no data (key %u) - timeout", tid, getKeyForTime(tid, pos) + (getNextKey?1:0));
}else{
return seek(tid, pos, getNextKey);
@ -879,7 +878,7 @@ namespace Mist {
/*LTS-END*/
stats();
userClient.finish();
nProxy.userClient.finish();
statsPage.finish();
myConn.close();
return 0;
@ -941,15 +940,15 @@ namespace Mist {
DEBUG_MSG(DLVL_DONTEVEN, "Loading track %u (next=%lu), %llu ms", nxt.tid, nxtKeyNum[nxt.tid], nxt.time);
if (nxt.offset >= curPage[nxt.tid].len){
if (nxt.offset >= nProxy.curPage[nxt.tid].len){
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
nxt.offset = 0;
if (curPage.count(nxt.tid) && curPage[nxt.tid].mapped){
if (getDTSCTime(curPage[nxt.tid].mapped, nxt.offset) < nxt.time){
if (nProxy.curPage.count(nxt.tid) && nProxy.curPage[nxt.tid].mapped){
if (getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset) < nxt.time){
ERROR_MSG("Time going backwards in track %u - dropping track.", nxt.tid);
}else{
nxt.time = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset);
nxt.time = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset);
buffer.insert(nxt);
}
prepareNext();
@ -957,7 +956,7 @@ namespace Mist {
}
}
if (!curPage.count(nxt.tid) || !curPage[nxt.tid].mapped){
if (!nProxy.curPage.count(nxt.tid) || !nProxy.curPage[nxt.tid].mapped){
//mapping failure? Drop this track and go to next.
//not an error - usually means end of stream.
DEBUG_MSG(DLVL_DEVEL, "Track %u no page - dropping track.", nxt.tid);
@ -966,7 +965,7 @@ namespace Mist {
}
//have we arrived at the end of the memory page? (4 zeroes mark the end)
if (!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4)){
if (!memcmp(nProxy.curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4)){
//if we don't currently know where we are, we're lost. We should drop the track.
if (!nxt.time){
DEBUG_MSG(DLVL_DEVEL, "Timeless empty packet on track %u - dropping track.", nxt.tid);
@ -994,8 +993,8 @@ namespace Mist {
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
nxt.offset = 0;
if (curPage.count(nxt.tid) && curPage[nxt.tid].mapped){
unsigned long long nextTime = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset);
if (nProxy.curPage.count(nxt.tid) && nProxy.curPage[nxt.tid].mapped){
unsigned long long nextTime = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset);
if (nextTime && nextTime < nxt.time){
DEBUG_MSG(DLVL_DEVEL, "Time going backwards in track %u - dropping track.", nxt.tid);
}else{
@ -1012,7 +1011,7 @@ namespace Mist {
prepareNext();
return;
}
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
thisPacket.reInit(nProxy.curPage[nxt.tid].mapped + nxt.offset, 0, true);
if (thisPacket){
if (thisPacket.getTime() != nxt.time && nxt.time){
WARN_MSG("Loaded track %ld@%llu instead of %ld@%llu", thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time);
@ -1073,9 +1072,9 @@ namespace Mist {
completeKeyReadyTimeOut = false;
}
}
if (curPage[nxt.tid]){
if (nxt.offset < curPage[nxt.tid].len){
unsigned long long nextTime = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset);
if (nProxy.curPage[nxt.tid]){
if (nxt.offset < nProxy.curPage[nxt.tid].len){
unsigned long long nextTime = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset);
if (nextTime){
nxt.time = nextTime;
}else{
@ -1123,25 +1122,25 @@ namespace Mist {
}
}
int tNum = 0;
if (!userClient.getData()){
if (!nProxy.userClient.getData()){
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
if (!userClient.getData()){
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
if (!nProxy.userClient.getData()){
DEBUG_MSG(DLVL_WARN, "Player connection failure - aborting output");
myConn.close();
return;
}
}
if (!trackMap.size()){
IPC::userConnection userConn(userClient.getData());
if (!nProxy.trackMap.size()){
IPC::userConnection userConn(nProxy.userClient.getData());
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){
userConn.setTrackId(tNum, *it);
userConn.setKeynum(tNum, nxtKeyNum[*it]);
tNum ++;
}
}
userClient.keepAlive();
nProxy.userClient.keepAlive();
if (tNum > SIMUL_TRACKS){
DEBUG_MSG(DLVL_WARN, "Too many tracks selected, using only first %d", SIMUL_TRACKS);
}

View file

@ -5,38 +5,36 @@
namespace Mist {
///\brief Builds an index file for HTTP Live streaming.
///\return The index file for HTTP Live Streaming.
std::string OutHLS::liveIndex(){
std::string OutHLS::liveIndex() {
std::stringstream result;
result << "#EXTM3U\r\n";
int audioId = -1;
std::string audioName;
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.codec == "AAC" || it->second.codec == "MP3" || it->second.codec == "AC3"){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.codec == "AAC" || it->second.codec == "MP3" || it->second.codec == "AC3") {
audioId = it->first;
audioName = it->second.getIdentifier();
break;
}
}
unsigned int vidTracks = 0;
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.codec == "H264" || it->second.codec == "HEVC"){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.codec == "H264" || it->second.codec == "HEVC") {
vidTracks++;
int bWidth = it->second.bps;
if (bWidth < 5){
if (bWidth < 5) {
bWidth = 5;
}
if (audioId != -1){
if (audioId != -1) {
bWidth += myMeta.tracks[audioId].bps;
}
result << "#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=" << (bWidth * 8) << "\r\n";
result << it->first;
if (audioId != -1){
if (audioId != -1) {
result << "_" << audioId;
}
result << "/index.m3u8\r\n";
}
}
if (!vidTracks && audioId){
if (!vidTracks && audioId) {
result << "#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=" << (myMeta.tracks[audioId].bps * 8) << "\r\n";
result << audioId << "/index.m3u8\r\n";
}
@ -44,83 +42,242 @@ namespace Mist {
return result.str();
}
std::string OutHLS::liveIndex(int tid){
std::string OutHLS::pushLiveIndex(){
std::stringstream result;
result << "#EXTM3U\r\n";
std::set<unsigned int> audioTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.codec == "AAC" || it->second.codec == "MP3" || it->second.codec == "AC3") {
audioTracks.insert(it->first);
}
}
if (!audioTracks.size()){
audioTracks.insert(-1);
}
unsigned int vidTracks = 0;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.codec == "H264" || it->second.codec == "HEVC") {
for (std::set<unsigned int>::iterator audIt = audioTracks.begin(); audIt != audioTracks.end(); audIt++){
vidTracks++;
int bWidth = it->second.bps;
if (bWidth < 5) {
bWidth = 5;
}
if (*audIt != -1) {
bWidth += myMeta.tracks[*audIt].bps;
}
result << "#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=" << (bWidth * 8) << "\r\n";
result << it->first;
if (*audIt != -1) {
result << "_" << *audIt;
}
result << "/index.m3u8\r\n";
}
}
}
if (!vidTracks && audioTracks.size()) {
result << "#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=" << (myMeta.tracks[*audioTracks.begin()].bps * 8) << "\r\n";
result << *audioTracks.begin() << "/index.m3u8\r\n";
}
DEBUG_MSG(DLVL_HIGH, "Sending this index: %s", result.str().c_str());
return result.str();
}
std::string OutHLS::pushLiveIndex(int tid, unsigned long bTime, unsigned long eTime){
updateMeta();
if (!myMeta.tracks[tid].fragments.size()) {
DEBUG_MSG(DLVL_FAIL, "liveIndex called with track %d, which has no fragments!", tid);
return "";
}
std::stringstream result;
//parse single track
int longestFragment = 0;
for (std::deque<DTSC::Fragment>::iterator it = myMeta.tracks[tid].fragments.begin(); (it + 1) != myMeta.tracks[tid].fragments.end(); it++) {
if (it->getDuration() > longestFragment) {
longestFragment = it->getDuration();
}
}
if ((myMeta.tracks[tid].lastms - myMeta.tracks[tid].firstms) / myMeta.tracks[tid].fragments.size() > longestFragment) {
longestFragment = (myMeta.tracks[tid].lastms - myMeta.tracks[tid].firstms) / myMeta.tracks[tid].fragments.size();
}
result << "#EXTM3U\r\n#EXT-X-TARGETDURATION:" << (longestFragment / 1000) + 1 << "\r\n";
std::deque<std::string> lines;
unsigned int skippedLines = 0;
for (std::deque<DTSC::Fragment>::iterator it = myMeta.tracks[tid].fragments.begin(); it != myMeta.tracks[tid].fragments.end(); it++) {
long long int starttime = myMeta.tracks[tid].getKey(it->getNumber()).getTime();
long long duration = it->getDuration();
if (duration <= 0) {
duration = myMeta.tracks[tid].lastms - starttime;
}
if (starttime < bTime){
skippedLines++;
}
if (starttime >= bTime && (starttime + duration) <= eTime){
char lineBuf[400];
snprintf(lineBuf, 400, "#EXTINF:%lld, no desc\r\n%lld_%lld.ts\r\n", ((duration + 500) / 1000), starttime, starttime + duration);
lines.push_back(lineBuf);
}
}
result << "#EXT-X-MEDIA-SEQUENCE:" << myMeta.tracks[tid].missedFrags + skippedLines << "\r\n";
while (lines.size()) {
result << lines.front();
lines.pop_front();
}
if (!myMeta.live && eTime >= myMeta.tracks[tid].lastms) {
result << "#EXT-X-ENDLIST\r\n";
}
DEBUG_MSG(DLVL_HIGH, "Sending this index: %s", result.str().c_str());
return result.str();
}
std::string OutHLS::liveIndex(int tid) {
updateMeta();
std::stringstream result;
//parse single track
int longestFragment = 0;
if (!myMeta.tracks[tid].fragments.size()){
if (!myMeta.tracks[tid].fragments.size()) {
DEBUG_MSG(DLVL_FAIL, "liveIndex called with track %d, which has no fragments!", tid);
return "";
}
for (std::deque<DTSC::Fragment>::iterator it = myMeta.tracks[tid].fragments.begin(); (it + 1) != myMeta.tracks[tid].fragments.end(); it++){
if (it->getDuration() > longestFragment){
for (std::deque<DTSC::Fragment>::iterator it = myMeta.tracks[tid].fragments.begin(); (it + 1) != myMeta.tracks[tid].fragments.end(); it++) {
if (it->getDuration() > longestFragment) {
longestFragment = it->getDuration();
}
}
if ((myMeta.tracks[tid].lastms - myMeta.tracks[tid].firstms) / myMeta.tracks[tid].fragments.size() > longestFragment){
if ((myMeta.tracks[tid].lastms - myMeta.tracks[tid].firstms) / myMeta.tracks[tid].fragments.size() > longestFragment) {
longestFragment = (myMeta.tracks[tid].lastms - myMeta.tracks[tid].firstms) / myMeta.tracks[tid].fragments.size();
}
result << "#EXTM3U\r\n#EXT-X-TARGETDURATION:" << (longestFragment / 1000) + 1 << "\r\n";
std::deque<std::string> lines;
for (std::deque<DTSC::Fragment>::iterator it = myMeta.tracks[tid].fragments.begin(); it != myMeta.tracks[tid].fragments.end(); it++){
for (std::deque<DTSC::Fragment>::iterator it = myMeta.tracks[tid].fragments.begin(); it != myMeta.tracks[tid].fragments.end(); it++) {
long long int starttime = myMeta.tracks[tid].getKey(it->getNumber()).getTime();
std::stringstream line;
long long duration = it->getDuration();
if (duration <= 0){
if (duration <= 0) {
duration = myMeta.tracks[tid].lastms - starttime;
}
line << "#EXTINF:" << ((duration + 500) / 1000) << ", no desc\r\n" << starttime << "_" << duration + starttime << ".ts\r\n";
lines.push_back(line.str());
char lineBuf[400];
snprintf(lineBuf, 400, "#EXTINF:%lld, no desc\r\n%lld_%lld,ts\r\n", ((duration + 500) / 1000), starttime, starttime + duration);
lines.push_back(lineBuf);
}
//skip the first fragment if live and there are more than 2 fragments.
unsigned int skippedLines = 0;
if (myMeta.live){
//only print the last segment when VoD
lines.pop_back();
/*LTS-START*/
unsigned int skip = (( myMeta.tracks[tid].fragments.size()-1) * config->getInteger("startpos")) / 1000u;
while (skippedLines < skip && lines.size()){
if (myMeta.live) {
if (lines.size() > 2) {
lines.pop_front();
skippedLines++;
}
if (config->getInteger("listlimit")){
//only print the last segment when VoD
lines.pop_back();
/*LTS-START*/
unsigned int skip = ((myMeta.tracks[tid].fragments.size() - 1) * config->getInteger("startpos")) / 1000u;
while (skippedLines < skip && lines.size()) {
lines.pop_front();
skippedLines++;
}
if (config->getInteger("listlimit")) {
unsigned long listlimit = config->getInteger("listlimit");
while (lines.size() > listlimit){
while (lines.size() > listlimit) {
lines.pop_front();
skippedLines++;
}
}
/*LTS-END*/
}
result << "#EXT-X-MEDIA-SEQUENCE:" << myMeta.tracks[tid].missedFrags + skippedLines << "\r\n";
while (lines.size()){
while (lines.size()) {
result << lines.front();
lines.pop_front();
}
if ( !myMeta.live){
if (!myMeta.live) {
result << "#EXT-X-ENDLIST\r\n";
}
DEBUG_MSG(DLVL_HIGH, "Sending this index: %s", result.str().c_str());
return result.str();
} //liveIndex
OutHLS::OutHLS(Socket::Connection & conn) : TSOutput(conn){
std::string OutHLS::generatePushList() {
updateMeta();
std::set<unsigned int> videoTracks;
std::set<unsigned int> audioTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.codec == "AAC" || it->second.codec == "MP3" || it->second.codec == "AC3") {
audioTracks.insert(it->first);
}
if (it->second.codec == "H264" || it->second.codec == "H265"){
videoTracks.insert(it->first);
}
}
JSON::Value result;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
std::stringstream tid;
tid << it->second.trackID;
result["tracks"][tid.str()] = it->second.toJSON(true);
}
for(std::set<unsigned int>::iterator it = videoTracks.begin(); it != videoTracks.end(); it++){
for(std::set<unsigned int>::iterator it2 = audioTracks.begin(); it2 != audioTracks.end(); it2++){
JSON::Value quality;
std::stringstream identifier;
identifier << "/" << *it << "_" << *it2;
quality["index"] = "/push" + identifier.str() + "/index_\%llu_\%llu.m3u8";
quality["segment"] = identifier.str() + "/\%llu_\%llu.ts";
quality["video"] = *it;
quality["audio"] = *it2;
quality["id"] = identifier.str();
std::deque<DTSC::Fragment>::iterator it3 = myMeta.tracks[*it].fragments.begin();
for (int i = 0; i < 2; i++){
if (it3 != myMeta.tracks[*it].fragments.end()){
++it3;
}
}
for (; it3 != myMeta.tracks[*it].fragments.end(); it3++) {
if (myMeta.live && it3 == (myMeta.tracks[*it].fragments.end() - 1)){
//Skip the current last fragment if we are live
continue;
}
long long int starttime = myMeta.tracks[*it].getKey(it3->getNumber()).getTime();
std::stringstream line;
long long duration = it3->getDuration();
if (duration <= 0) {
duration = myMeta.tracks[*it].lastms - starttime;
}
std::stringstream segmenturl;
segmenturl << identifier.str() << "/" << starttime << "_" << duration + starttime << ".ts";
JSON::Value segment;
//segment["url"] = segmenturl.str();
segment["time"] = starttime;
segment["duration"] = duration;
segment["number"] = (unsigned int)it3->getNumber();
quality["segments"].append(segment);
}
result["qualities"].append(quality);
}
}
return result.toString();;
}
OutHLS::OutHLS(Socket::Connection & conn) : TSOutput(conn) {
realTime = 0;
}
OutHLS::~OutHLS() {}
void OutHLS::init(Util::Config * cfg){
void OutHLS::init(Util::Config * cfg) {
HTTPOutput::init(cfg);
capa["name"] = "HLS";
capa["desc"] = "Enables HTTP protocol Apple-specific streaming (also known as HLS).";
capa["url_rel"] = "/hls/$/index.m3u8";
capa["url_prefix"] = "/hls/$/";
capa["url_pushlist"] = "/hls/$/push/list";
capa["codecs"][0u][0u].append("HEVC");
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][1u].append("AAC");
@ -139,29 +296,29 @@ namespace Mist {
/*LTS-END*/
}
int OutHLS::canSeekms(unsigned int ms){
int OutHLS::canSeekms(unsigned int ms) {
//no tracks? Frame too new by definition.
if ( !myMeta.tracks.size()){
if (!myMeta.tracks.size()) {
return 1;
}
//check main track
DTSC::Track & mainTrack = myMeta.tracks[*selectedTracks.begin()];
//return "too late" if one track is past this point
if (ms < mainTrack.firstms){
return -1;
}
//return "too early" if one track is not yet at this point
if (ms > mainTrack.lastms){
return 1;
//loop trough all the tracks
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
//return "too late" if one track is past this point
if (ms < it->second.firstms) {
return -1;
}
//return "too early" if one track is not yet at this point
if (ms > it->second.lastms) {
return 1;
}
}
return 0;
}
void OutHLS::onHTTP(){
void OutHLS::onHTTP() {
std::string method = H.method;
if (H.url == "/crossdomain.xml"){
if (H.url == "/crossdomain.xml") {
H.Clean();
H.SetHeader("Content-Type", "text/xml");
H.SetHeader("Server", "MistServer/" PACKAGE_VERSION);
@ -176,27 +333,81 @@ namespace Mist {
H.Clean(); //clean for any possible next requests
return;
} //crossdomain.xml
if (H.url.find("hls") == std::string::npos){
if (H.method == "OPTIONS") {
H.Clean();
H.SetHeader("Content-Type", "application/octet-stream");
H.SetHeader("Cache-Control", "no-cache");
H.setCORSHeaders();
H.SetBody("");
H.SendResponse("200", "OK", myConn);
H.Clean();
return;
}
if (H.url.find("hls") == std::string::npos) {
myConn.close();
return;
}
appleCompat = (H.GetHeader("User-Agent").find("iPad") != std::string::npos) || (H.GetHeader("User-Agent").find("iPod") != std::string::npos)|| (H.GetHeader("User-Agent").find("iPhone") != std::string::npos);
appleCompat = (H.GetHeader("User-Agent").find("iPad") != std::string::npos) || (H.GetHeader("User-Agent").find("iPod") != std::string::npos) || (H.GetHeader("User-Agent").find("iPhone") != std::string::npos);
bool VLCworkaround = false;
if (H.GetHeader("User-Agent").substr(0, 3) == "VLC"){
if (H.GetHeader("User-Agent").substr(0, 3) == "VLC") {
std::string vlcver = H.GetHeader("User-Agent").substr(4);
if (vlcver[0] == '0' || vlcver[0] == '1' || (vlcver[0] == '2' && vlcver[2] < '2')){
if (vlcver[0] == '0' || vlcver[0] == '1' || (vlcver[0] == '2' && vlcver[2] < '2')) {
DEBUG_MSG(DLVL_INFO, "Enabling VLC version < 2.2.0 bug workaround.");
VLCworkaround = true;
}
}
initialize();
if (H.url.find(".m3u") == std::string::npos){
std::string tmpStr = H.getUrl().substr(5+streamName.size());
if (H.url.substr(5 + streamName.size(), 5) == "/push"){
std::string relPushUrl = H.url.substr(10 + streamName.size());
H.Clean();
if (relPushUrl == "/list"){
H.SetBody(generatePushList());
H.SendResponse("200", "OK", myConn);
H.Clean();
return;
}
if (relPushUrl.find(".m3u8") != std::string::npos) {
H.SetHeader("Content-Type", "audio/x-mpegurl");
} else {
H.SetHeader("Content-Type", "audio/mpegurl");
}
if (relPushUrl == "/index.m3u8"){
H.SetHeader("Cache-Control", "no-cache");
H.setCORSHeaders();
H.SetBody(pushLiveIndex());
H.SendResponse("200", "OK", myConn);
H.Clean(); //clean for any possible next requests
return;
}else {
unsigned int vTrack;
unsigned int aTrack;
unsigned long long bTime;
unsigned long long eTime;
if (sscanf(relPushUrl.c_str(), "/%u_%u/index_%llu_%llu.m3u", &vTrack, &aTrack, &bTime, &eTime) == 4) {
if (eTime < bTime){
eTime = bTime;
}
H.SetHeader("Cache-Control", "no-cache");
H.setCORSHeaders();
H.SetBody(pushLiveIndex(vTrack, bTime, eTime));
H.SendResponse("200", "OK", myConn);
H.Clean(); //clean for any possible next requests
return;
}
}
H.SetBody("The HLS URL wasn't understood - what did you want, exactly?\n");
myConn.SendNow(H.BuildResponse("404", "URL mismatch"));
H.Clean(); //clean for any possible next requests
return;
}else if (H.url.find(".m3u") == std::string::npos) {
std::string tmpStr = H.getUrl().substr(5 + streamName.size());
long long unsigned int from;
if (sscanf(tmpStr.c_str(), "/%u_%u/%llu_%llu.ts", &vidTrack, &audTrack, &from, &until) != 4){
if (sscanf(tmpStr.c_str(), "/%u/%llu_%llu.ts", &vidTrack, &from, &until) != 3){
if (sscanf(tmpStr.c_str(), "/%u_%u/%llu_%llu.ts", &vidTrack, &audTrack, &from, &until) != 4) {
if (sscanf(tmpStr.c_str(), "/%u/%llu_%llu.ts", &vidTrack, &from, &until) != 3) {
DEBUG_MSG(DLVL_MEDIUM, "Could not parse URL: %s", H.getUrl().c_str());
H.Clean();
H.setCORSHeaders();
@ -204,33 +415,38 @@ namespace Mist {
myConn.SendNow(H.BuildResponse("404", "URL mismatch"));
H.Clean(); //clean for any possible next requests
return;
}else{
} else {
selectedTracks.clear();
selectedTracks.insert(vidTrack);
}
}else{
} else {
selectedTracks.clear();
selectedTracks.insert(vidTrack);
selectedTracks.insert(audTrack);
}
if (myMeta.live){
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.codec == "ID3"){
selectedTracks.insert(it->first);
}
}
if (myMeta.live) {
unsigned int timeout = 0;
int seekable;
do {
seekable = canSeekms(from);
/// \todo Detection of out-of-range parts.
if (seekable > 0){
if (seekable > 0) {
//time out after 21 seconds
if (++timeout > 42){
if (++timeout > 42) {
myConn.close();
break;
}
Util::sleep(500);
updateMeta();
}
}while (myConn && seekable > 0);
if (seekable < 0){
} while (myConn && seekable > 0);
if (seekable < 0) {
H.Clean();
H.setCORSHeaders();
H.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
@ -240,7 +456,7 @@ namespace Mist {
return;
}
}
seek(from);
ts_from = from;
lastVid = from * 90;
@ -256,12 +472,12 @@ namespace Mist {
H.StartResponse(H, myConn, VLCworkaround);
unsigned int fragCounter = myMeta.tracks[vidTrack].missedFrags;
for (std::deque<DTSC::Fragment>::iterator it = myMeta.tracks[vidTrack].fragments.begin(); it != myMeta.tracks[vidTrack].fragments.end(); it++){
long long int starttime = myMeta.tracks[vidTrack].getKey(it->getNumber()).getTime();
if (starttime <= from && starttime + it->getDuration() > from){
EXTREME_MSG("setting continuity counter for PAT/PMT to %d",fragCounter);
contCounters[0]=fragCounter; //PAT continuity counter
contCounters[4096]=fragCounter; //PMT continuity counter
for (std::deque<DTSC::Fragment>::iterator it = myMeta.tracks[vidTrack].fragments.begin(); it != myMeta.tracks[vidTrack].fragments.end(); it++) {
long long int starttime = myMeta.tracks[vidTrack].getKey(it->getNumber()).getTime();
if (starttime <= from && starttime + it->getDuration() > from) {
EXTREME_MSG("setting continuity counter for PAT/PMT to %d", fragCounter);
contCounters[0] = fragCounter; //PAT continuity counter
contCounters[4096] = fragCounter; //PMT continuity counter
break;
}
++fragCounter;
@ -269,13 +485,13 @@ namespace Mist {
packCounter = 0;
parseData = true;
wantRequest = false;
}else{
} else {
initialize();
std::string request = H.url.substr(H.url.find("/", 5) + 1);
H.Clean();
if (H.url.find(".m3u8") != std::string::npos){
if (H.url.find(".m3u8") != std::string::npos) {
H.SetHeader("Content-Type", "audio/x-mpegurl");
}else{
} else {
H.SetHeader("Content-Type", "audio/mpegurl");
}
H.SetHeader("Cache-Control", "no-cache");
@ -286,10 +502,10 @@ namespace Mist {
return;
}
std::string manifest;
if (request.find("/") == std::string::npos){
if (request.find("/") == std::string::npos) {
manifest = liveIndex();
}else{
int selectId = atoi(request.substr(0,request.find("/")).c_str());
} else {
int selectId = atoi(request.substr(0, request.find("/")).c_str());
manifest = liveIndex(selectId);
}
H.SetBody(manifest);
@ -298,7 +514,7 @@ namespace Mist {
}
void OutHLS::sendTS(const char * tsData, unsigned int len){
void OutHLS::sendTS(const char * tsData, unsigned int len) {
H.Chunkify(tsData, len, myConn);
}
}

View file

@ -12,6 +12,13 @@ namespace Mist {
protected:
std::string liveIndex();
std::string liveIndex(int tid);
std::string pushLiveIndex();
std::string pushLiveIndex(int tid, unsigned long bTime, unsigned long eTime);
std::string generatePushList();
int canSeekms(unsigned int ms);
int keysToSend;
unsigned int vidTrack;

View file

@ -280,7 +280,7 @@ namespace Mist {
moof_box.setContent(mfhd_box, 0);
moof_box.setContent(traf_box, 1);
/*LTS-START*/
if (encrypt){
if (nProxy.encrypt){
MP4::UUID_SampleEncryption sEnc;
sEnc.setVersion(0);
if (myMeta.tracks[tid].type == "audio") {
@ -340,10 +340,10 @@ namespace Mist {
//Load the encryption data page
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_ENCRYPT, streamName.c_str());
encryptionPage.init(pageName, 8 * 1024 * 1024, false, false);
if (encryptionPage.mapped) {
vmData.read(encryptionPage.mapped);
encrypt = true;
nProxy.encryptionPage.init(pageName, 8 * 1024 * 1024, false, false);
if (nProxy.encryptionPage.mapped) {
nProxy.vmData.read(nProxy.encryptionPage.mapped);
nProxy.encrypt = true;
}
encryptionLoaded = true;
}
@ -352,9 +352,9 @@ namespace Mist {
std::string OutHSS::protectionHeader() {
loadEncryption();
std::string xmlGen = "<WRMHEADER xmlns=\"http://schemas.microsoft.com/DRM/2007/03/PlayReadyHeader\" version=\"4.0.0.0\"><DATA><PROTECTINFO><KEYLEN>16</KEYLEN><ALGID>AESCTR</ALGID></PROTECTINFO><KID>";
xmlGen += vmData.keyid;
xmlGen += nProxy.vmData.keyid;
xmlGen += "</KID><LA_URL>";
xmlGen += vmData.laurl;
xmlGen += nProxy.vmData.laurl;
xmlGen += "</LA_URL></DATA></WRMHEADER>";
std::string tmp = toUTF16(xmlGen);
tmp = tmp.substr(2);
@ -512,7 +512,7 @@ namespace Mist {
Result << "</StreamIndex>\n";
}
/*LTS-START*/
if (encrypt) {
if (nProxy.encrypt) {
Result << "<Protection><ProtectionHeader SystemID=\"9a04f079-9840-4286-ab92-e65be0885f95\">";
Result << protectionHeader();
Result << "</ProtectionHeader></Protection>";

View file

@ -173,7 +173,7 @@ namespace Mist {
if (handler != capa["name"].asStringRef() || H.GetVar("stream") != streamName){
DEBUG_MSG(DLVL_MEDIUM, "Switching from %s (%s) to %s (%s)", capa["name"].asStringRef().c_str(), streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str());
streamName = H.GetVar("stream");
userClient.finish();
nProxy.userClient.finish();
statsPage.finish();
reConnector(handler);
H.Clean();
@ -396,8 +396,6 @@ namespace Mist {
return trustedProxies.count(ip);
}
/*LTS-END*/
/*begin-roxlu*/
void HTTPOutput::sendResponse(std::string message, std::string code) {

346
src/output/output_push.cpp Normal file
View file

@ -0,0 +1,346 @@
#include "output_push.h"
#include <mist/http_parser.h>
#include <mist/shared_memory.h>
#include <sys/stat.h>
#include <mist/tinythread.h>
#define PUSH_INDEX_SIZE 5 //Build index based on most recent X segments
Util::Config * pConf;
std::string sName;
std::string baseURL;
long long srcPort;
std::string srcHost;
std::string dstHost;
long long dstPort;
std::string dstUrl;
//Used to keep track of all segments that can be pushed
std::map<std::string, std::map<int, std::string> > pushableSegments;
//Used to keep track of the timestamp of each pushableSegment
std::map<std::string, std::map<int, int> > pushableTimes;
//Used to keep track of the duration of each pushableSegment
std::map<std::string, std::map<int, int> > pushableDurations;
//For each quality, store the latest number found in the push list
std::map<std::string, int> latestNumber;
//For each quality, store whether it is currently being pushed.
std::map<std::string, bool> parsing;
//For each quality, store an fprint-style string of the relative url to the index_<beginTime>_<endTime>.m3u8
std::map<std::string, std::string> qualityIndex;
//For each quality, store an fprint-style string of the relative url to the segment.
std::map<std::string, std::string> qualitySegment;
//For each quality, store the last PUSH_INDEX_SIZE - 1 timestamps. Used to generate a time-constrained index.m3u8.
std::map<std::string, std::deque<int> > qualityBeginTimes;
//Parses a uri of the form 'http://<host>[:<port>]/<url>, and split it into variables
void parseURI(const std::string & uri, std::string & host, long long & port, std::string & url){
int loc = 0;
if (uri.find("http://") == 0){
loc += 7;
}
host = uri.substr(loc, uri.find_first_of(":/", 7) - 7);
loc += host.size();
if (uri[loc] == ':'){
port = atoll(uri.c_str() + loc + 1);
loc = uri.find("/", loc);
}
url = uri.substr(loc);
}
//Do an HTTP request, and route it into a post request on a different socket.
void proxyToPost(Socket::Connection & src, const std::string & srcUrl, Socket::Connection & dst, const std::string & dstUrl){
INFO_MSG("Routing %s to %s", srcUrl.c_str(), dstUrl.c_str());
//Send the initial request
HTTP::Parser H;
H.url = srcUrl;
H.SendRequest(src);
H.Clean();
//Read only the headers of the reply
H.headerOnly = true;
while (src.connected()){
if (src.Received().size() || src.spool()){
if (H.Read(src)){
break;
}
}
}
H.headerOnly = false;
INFO_MSG("Reply from %s: %s %s", src.getHost().c_str(), H.url.c_str(), H.method.c_str());
//Change the headers of the reply to form a post request
H.method = "POST";
H.url = dstUrl;
H.protocol = "HTTP/1.1";
H.SetHeader("Host", dstHost);
//Start the post request
H.SendRequest(dst);
//Route the original payload.
H.Proxy(src, dst);
H.Clean();
while (dst.connected()){
if (dst.Received().size() || dst.spool()){
if (H.Read(dst)){
break;
}
}
}
INFO_MSG("Reply from %s: %s %s", dst.getHost().c_str(), H.url.c_str(), H.method.c_str());
}
///Push the first registered segment for this quality
void pushFirstElement(std::string qId) {
std::string semName = "MstPushLock" + sName;
IPC::semaphore pushLock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
std::string url;
int time;
int beginTime;
int duration;
//Wait for exclusive access to all globals
pushLock.wait();
//Retrieve all globals for the segment to be pushed
if (pushableSegments[qId].size()){
url = pushableSegments[qId].begin()->second;
time = pushableTimes[qId].begin()->second;
duration = pushableDurations[qId].begin()->second;
if (qualityBeginTimes[qId].size()){
beginTime = qualityBeginTimes[qId].front();
}else{
beginTime = time;
}
}
//Give up exclusive access to all globals
pushLock.post();
//Return if we do not have a segment to push
if (url == ""){
return;
}
//Create both source and destination connections
Socket::Connection srcConn(srcHost, srcPort, true);
Socket::Connection dstConn(dstHost, dstPort, true);
//Set the locations to push to for this segment
std::string srcLocation = baseURL + url;
std::string dstLocation = dstUrl.substr(0, dstUrl.rfind("/")) + url;
//Push the segment
proxyToPost(srcConn, srcLocation, dstConn, dstLocation);
srcConn = Socket::Connection(srcHost, srcPort, true);
//Set the location to push to for the index containing this segment.
//The index will contain (at most) the last PUSH_INDEX_SIZE segments.
char srcIndex[200];
snprintf(srcIndex, 200, qualityIndex[qId].c_str(), beginTime , time + duration);
srcLocation = baseURL + srcIndex;
dstLocation = dstLocation.substr(0, dstLocation.rfind("/")) + "/index.m3u8";
//Push the index
proxyToPost(srcConn, srcLocation, dstConn, dstLocation);
srcConn = Socket::Connection(srcHost, srcPort, true);
//Set the location to push to for the global index containing all qualities.
srcLocation = baseURL + "/push/index.m3u8";
dstLocation = dstLocation.substr(0, dstLocation.rfind("/"));
dstLocation = dstLocation.substr(0, dstLocation.rfind("/")) + "/index.m3u8";
//Push the global index
proxyToPost(srcConn, srcLocation, dstConn, dstLocation);
//Close both connections
///\todo Make the dstConn "persistent" for each thread?
srcConn.close();
dstConn.close();
//Wait for exclusive access to all globals
pushLock.wait();
//Update all globals to indicate the segment has been pushed correctly
pushableSegments[qId].erase(pushableSegments[qId].begin());
pushableTimes[qId].erase(pushableTimes[qId].begin());
pushableDurations[qId].erase(pushableDurations[qId].begin());
qualityBeginTimes[qId].push_back(time);
//Remove the first elements fromt he beginTimes map to make sure we have PUSH_INDEX_SIZE elements in our index.
//We use -1 here, because we use the segment to currently push as well as everything stored in the map
while (qualityBeginTimes[qId].size() > PUSH_INDEX_SIZE - 1){
qualityBeginTimes[qId].pop_front();
}
//Give up exclusive access to all globals
pushLock.post();
}
///Thread used to push data.
void pushThread(void * nullPointer){
std::string myThread;
//Attempt to claim a non-claimed quality.
std::string semName = "MstPushClaim" + sName;
IPC::semaphore pushThreadLock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
pushThreadLock.wait();
for (std::map<std::string, std::map<int, std::string> >::iterator it = pushableSegments.begin(); it != pushableSegments.end(); it++){
if (it->second.size()){//Make sure we dont try to "claim" pushing an empty track
if (!parsing.count(it->first) || !parsing[it->first]){
INFO_MSG("Claiming thread %s", it->first.c_str());
myThread = it->first;
parsing[it->first] = true;
break;
}
}
}
pushThreadLock.post();
//Return if we were unable to claim a quality
if (myThread == ""){
INFO_MSG("No thread claimed");
return;
}
//While this output is active, push the first element in the list
while (pConf->is_active){
pushFirstElement(myThread);
if (!pushableSegments[myThread].size()){
Util::sleep(1000);
}
}
parsing[myThread] = false;
}
namespace Mist {
OutPush::OutPush(Socket::Connection & conn) : Output(conn){
config->activate();
}
OutPush::~OutPush(){}
void OutPush::requestHandler() {
//Set aal basic data only the first time.
if (streamName == ""){
srcPort = 80;
parseURI(config->getString("pushlist"), srcHost, srcPort, pushURL);
dstPort = 80;
parseURI(config->getString("destination"), dstHost, dstPort, dstUrl);
//Strip "/push/list" from the URL
baseURL = pushURL.substr(0, pushURL.rfind("/"));
baseURL = baseURL.substr(0, baseURL.rfind("/"));
//Locate the streamname from the pushURL
int loc = baseURL.find("/", 1) + 1;
streamName = pushURL.substr(loc, pushURL.rfind("/") - loc);
sName = streamName;
INFO_MSG("host: %s, port %lld, url %s, baseURL %s, streamName %s", srcHost.c_str(), srcPort, pushURL.c_str(), baseURL.c_str(), streamName.c_str());
}
//Reconnect when disconnected
if (!listConn.connected()){
listConn = Socket::Connection(srcHost, srcPort, true);
}
//Request the push list
if (listConn.connected()){
HTTP::Parser hReq;
hReq.url = baseURL + "/push/list";
hReq.SendRequest(listConn);
hReq.Clean();
//Read the entire response, not just the headers!
while (!hReq.Read(listConn) && listConn.connected()){
Util::sleep(100);
listConn.spool();
}
//Construct and parse the json list
JSON::Value reply = JSON::fromString(hReq.body);
int numQualities = reply["qualities"].size();
for (int i = 0; i < numQualities; i++){
JSON::Value & qRef = reply["qualities"][i];
std::string qId = qRef["id"].asString();
//Set both the index and segment urls when not yet set.
if (!qualityIndex.count(qId)){
qualityIndex[qId] = qRef["index"].asString();
qualitySegment[qId] = qRef["segment"].asString();
}
//Save latest segment number before parsing
int curLatestNumber = latestNumber[qId];
//Loop over all segments
for (int j = 0; j < qRef["segments"].size(); j++){
JSON::Value & segRef = qRef["segments"][j];
int thisNumber = segRef["number"].asInt();
//Check if this segment is newer than the newest segment before parsing
if (thisNumber > curLatestNumber){
//If it is the highest so far, store its number
if (thisNumber > latestNumber[qId]){
latestNumber[qId] = thisNumber;
}
//If it is not yet added, add it.
if (!pushableSegments[qId].count(thisNumber)){
char segmentUrl[200];
//The qualitySegment map contains a printf-style string
snprintf(segmentUrl, 200, qualitySegment[qId].c_str(), segRef["time"].asInt(), segRef["time"].asInt() + segRef["duration"].asInt());
pushableSegments[qId][segRef["number"].asInt()] = segmentUrl;
pushableTimes[qId][segRef["number"].asInt()] = segRef["time"].asInt();
pushableDurations[qId][segRef["number"].asInt()] = segRef["duration"].asInt();
}
}
}
}
}
//Calculate how many qualities are not yet being pushed
int threadsToSpawn = pushableSegments.size();
for (std::map<std::string, std::map<int, std::string> >::iterator it = pushableSegments.begin(); it != pushableSegments.end(); it++){
if (parsing.count(it->first) && parsing[it->first]){
threadsToSpawn --;
}
}
//And start a thread for each unpushed quality.
//Threads determine which quality to push for themselves.
for (int i = 0; i < threadsToSpawn; i++){
tthread::thread thisThread(pushThread, 0);
thisThread.detach();
}
Util::sleep(100);
}
void OutPush::init(Util::Config * cfg){
Output::init(cfg);
capa["name"] = "Push";
capa["desc"] = "Enables HTTP Pushing.";
capa["required"]["pushlist"]["name"] = "URL location of the pushing list";
capa["required"]["pushlist"]["help"] = "This is the location that will be checked for pushable data.";
capa["required"]["pushlist"]["option"] = "--pushlist";
capa["required"]["pushlist"]["type"] = "str";
cfg->addOption("pushlist", JSON::fromString("{\"arg\":\"string\",\"short\":\"p\",\"long\":\"pushlist\",\"help\":\"This is the location that will be checked for pushable data.\"}"));
capa["required"]["destination"]["name"] = "URL location of the destination";
capa["required"]["destination"]["help"] = "This is the location that the date will be pushed to.";
capa["required"]["destination"]["option"] = "--destination";
capa["required"]["destination"]["type"] = "str";
cfg->addOption("destination", JSON::fromString("{\"arg\":\"string\",\"short\":\"D\",\"long\":\"destination\",\"help\":\"This is the location that the data will be checked for pushed to.\"}"));
cfg->addBasicConnectorOptions(capa);
pConf = cfg;
config = cfg;
}
}

19
src/output/output_push.h Normal file
View file

@ -0,0 +1,19 @@
#include <mist/socket.h>
#include "output.h"
namespace Mist {
class OutPush : public Output {
public:
OutPush(Socket::Connection & conn);
~OutPush();
static bool listenMode(){return false;}
virtual void requestHandler();
static void init(Util::Config * cfg);
protected:
Socket::Connection listConn;
std::string pushURL;
};
}
typedef Mist::OutPush mistOut;

View file

@ -903,12 +903,13 @@ namespace Mist {
}
JSON::Value pack_out = F.toJSON(myMeta, *amf_storage, next.cs_id*3 + (F.data[0] == 0x09 ? 0 : (F.data[0] == 0x08 ? 1 : 2) ));
if ( !pack_out.isNull()){
if (!userClient.getData()){
if (!nProxy.userClient.getData()){
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
userClient = IPC::sharedClient(userPageName, 30, true);
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
}
continueNegotiate(pack_out["trackid"].asInt());
nProxy.streamName = streamName;
bufferLivePacket(pack_out);
}
break;

View file

@ -33,7 +33,7 @@ namespace Mist {
if (packData.getBytesFree() == 184){
packData.clear();
packData.setPID(0x100 - 1 + thisPacket.getTrackId());
packData.setPID(thisPacket.getTrackId());
packData.setContinuityCounter(++contCounters[packData.getPID()]);
if (first[thisPacket.getTrackId()]){
packData.setUnitStart(1);