Fully implemented DTSC pull support

This commit is contained in:
Erik Zandvliet 2016-03-15 11:29:01 +01:00 committed by Thulinma
parent 668560ff05
commit dda0ea669c
27 changed files with 930 additions and 272 deletions

View file

@ -160,6 +160,7 @@ set(libHeaders
${SOURCE_DIR}/lib/tinythread.h
${SOURCE_DIR}/lib/ts_packet.h
${SOURCE_DIR}/lib/ts_stream.h
${SOURCE_DIR}/lib/util.h
${SOURCE_DIR}/lib/vorbis.h
${SOURCE_DIR}/lib/triggers.h
)
@ -203,6 +204,7 @@ set(libSources
${SOURCE_DIR}/lib/tinythread.cpp
${SOURCE_DIR}/lib/ts_packet.cpp
${SOURCE_DIR}/lib/ts_stream.cpp
${SOURCE_DIR}/lib/util.cpp
${SOURCE_DIR}/lib/vorbis.cpp
${SOURCE_DIR}/lib/triggers.cpp
)
@ -290,9 +292,6 @@ macro(makeInput inputName format)
#Set compile definitions
unset(my_definitions)
if (";${ARGN};" MATCHES ";nolock;")#Currently only used in TSStream
list(APPEND my_definitions "INPUT_NOLOCK")
endif()
if (";${ARGN};" MATCHES ";tslive;")
list(APPEND my_definitions "TSLIVE_INPUT")
endif()
@ -320,7 +319,7 @@ makeInput(Buffer buffer)
makeInput(ISMV ismv)#LTS
makeInput(MP4 mp4)#LTS
makeInput(TS ts)#LTS
makeInput(TSStream ts nolock tslive)#LTS
makeInput(TSStream ts tslive)#LTS
makeInput(Folder folder folder)#LTS
########################################

View file

@ -1,7 +1,7 @@
- Construct input
- Parse arguments
- Stream wordt gelocked IFF !nolock
- Stream wordt gelocked IFF conv.needsLock()
- Start .run()
- setup(): opent files/sockets/etc waar nodig
- set "isStream" naar true

View file

@ -264,6 +264,10 @@ bool Util::Config::parseArgs(int & argc, char ** & argv) {
return true;
}
bool Util::Config::hasOption(const std::string & optname) {
return vals.isMember(optname);
}
/// Returns a reference to the current value of an option or default if none was set.
/// If the option does not exist, this exits the application with a return code of 37.
JSON::Value & Util::Config::getOption(std::string optname, bool asArray) {

View file

@ -30,6 +30,7 @@ namespace Util {
void addOption(std::string optname, JSON::Value option);
void printHelp(std::ostream & output);
bool parseArgs(int & argc, char ** & argv);
bool hasOption(const std::string & optname);
JSON::Value & getOption(std::string optname, bool asArray = false);
std::string getString(std::string optname);
long long int getInteger(std::string optname);

View file

@ -109,6 +109,7 @@ namespace DTSC {
void operator = (const Packet & rhs);
operator bool() const;
packType getVersion() const;
void reInit(Socket::Connection & src);
void reInit(const char * data_, unsigned int len, bool noCopy = false);
void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, long long packBytePos, bool isKeyframe);
void getString(const char * identifier, char *& result, unsigned int & len) const;
@ -354,8 +355,8 @@ namespace DTSC {
void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000);
LTS*/
void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000, const char * iVec = 0);
unsigned int getSendLen(bool skipDynamic = false);
void send(Socket::Connection & conn, bool skipDynamic = false);
unsigned int getSendLen(bool skipDynamic = false, std::set<unsigned long> selectedTracks = std::set<unsigned long>());
void send(Socket::Connection & conn, bool skipDynamic = false, std::set<unsigned long> selectedTracks = std::set<unsigned long>());
void writeTo(char * p);
JSON::Value toJSON();
void reset();

View file

@ -109,6 +109,32 @@ namespace DTSC {
}
}
void Packet::reInit(Socket::Connection & src) {
int sleepCount = 0;
null();
int toReceive = 0;
while (src.connected()){
if (!toReceive && src.Received().available(8)){
if (src.Received().copy(2) != "DT"){
INFO_MSG("Invalid DTSC Packet header encountered (%s)", src.Received().copy(4).c_str());
break;
}
toReceive = Bit::btohl(src.Received().copy(8).data() + 4);
}
if (toReceive && src.Received().available(toReceive + 8)){
std::string dataBuf = src.Received().remove(toReceive + 8);
reInit(dataBuf.data(), dataBuf.size());
return;
}
if(!src.spool()){
if (sleepCount++ > 5){
return;
}
Util::sleep(500);
}
}
}
///\brief Initializes a packet with new data
///\param data_ The new data for the packet
///\param len The length of the data pointed to by data_
@ -1530,7 +1556,7 @@ namespace DTSC {
} else if (type == "video") {
result += 48;
}
if (missedFrags) {
if (!skipDynamic && missedFrags) {
result += 23;
}
return result;
@ -1709,10 +1735,12 @@ namespace DTSC {
}
///\brief Determines the "packed" size of a meta object
unsigned int Meta::getSendLen(bool skipDynamic) {
unsigned int Meta::getSendLen(bool skipDynamic, std::set<unsigned long> selectedTracks) {
unsigned int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21;
for (std::map<unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) {
dataLen += it->second.getSendLen(skipDynamic);
if (!selectedTracks.size() || selectedTracks.count(it->first)){
dataLen += it->second.getSendLen(skipDynamic);
}
}
return dataLen + 8; //add 8 bytes header
}
@ -1749,13 +1777,15 @@ namespace DTSC {
}
///\brief Writes a meta object to a socket
void Meta::send(Socket::Connection & conn, bool skipDynamic) {
int dataLen = getSendLen(skipDynamic) - 8; //strip 8 bytes header
void Meta::send(Socket::Connection & conn, bool skipDynamic, std::set<unsigned long> selectedTracks) {
int dataLen = getSendLen(skipDynamic, selectedTracks) - 8; //strip 8 bytes header
conn.SendNow(DTSC::Magic_Header, 4);
conn.SendNow(convertInt(dataLen), 4);
conn.SendNow("\340\000\006tracks\340", 10);
for (std::map<unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) {
it->second.send(conn, skipDynamic);
if (!selectedTracks.size() || selectedTracks.count(it->first)){
it->second.send(conn, skipDynamic);
}
}
conn.SendNow("\000\000\356", 3);//End tracks object
if (vod) {

View file

@ -1068,6 +1068,54 @@ namespace IPC {
///\brief The deconstructor
sharedClient::~sharedClient() {
mySemaphore.close();
}
bool sharedClient::isSingleEntry() {
semaphore tmpSem(baseName.c_str(), O_RDWR);
if (!tmpSem) {
HIGH_MSG("Creating semaphore %s failed: %s, assuming we're alone", baseName.c_str(), strerror(errno));
return true;
}
//Empty is used to compare for emptyness. This is not needed when the page uses a counter
char * empty = 0;
if (!hasCounter) {
empty = (char *)malloc(payLen * sizeof(char));
if (!empty) {
HIGH_MSG("Failed to allocate %u bytes for empty payload, assuming we're not alone", payLen);
return false;
}
memset(empty, 0, payLen);
}
bool result = true;
{
semGuard tmpGuard(&tmpSem);
for (char i = 'A'; i <= 'Z'; i++) {
sharedPage tmpPage(baseName.substr(1) + i, (4096 << (i - 'A')), false, false);
if (!tmpPage.mapped) {
break;
}
int offset = 0;
while (offset + payLen + (hasCounter ? 1 : 0) <= tmpPage.len) {
//Skip our own entry
if (tmpPage.name == myPage.name && offset == offsetOnPage){
offset += payLen + (hasCounter ? 1 : 0);
continue;
}
if (!((hasCounter && tmpPage.mapped[offset] == 0) || (!hasCounter && !memcmp(tmpPage.mapped + offset, empty, payLen)))) {
result = false;
break;
}
offset += payLen + (hasCounter ? 1 : 0);
}
}
}
if (empty) {
free(empty);
}
return result;
}
///\brief Writes data to the shared data
@ -1113,6 +1161,16 @@ namespace IPC {
}
return (myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0));
}
int sharedClient::getCounter() {
if (!hasCounter){
return -1;
}
if (!myPage.mapped) {
return 0;
}
return *(myPage.mapped + offsetOnPage);
}
userConnection::userConnection(char * _data) {
data = _data;

View file

@ -60,7 +60,7 @@ namespace IPC {
class semaphore {
public:
semaphore();
semaphore(const char * name, int oflag, mode_t mode, unsigned int value);
semaphore(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0);
~semaphore();
operator bool() const;
void open(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0);
@ -220,6 +220,8 @@ namespace IPC {
void finish();
void keepAlive();
char * getData();
int getCounter();
bool isSingleEntry();
private:
///\brief The basename of the shared pages.
std::string baseName;

View file

@ -204,6 +204,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
//check in curConf for capabilities-inputs-<naam>-priority/source_match
std::string player_bin;
bool pullMode = false;
bool selected = false;
long long int curPrio = -1;
DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs");
@ -224,6 +225,20 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
curPrio = input.getMember("priority").asInt();
selected = true;
}
if (input.hasMember("stream_match")){
source = input.getMember("stream_match").asString();
front = source.substr(0,source.find('*'));
back = source.substr(source.find('*')+1);
DEBUG_MSG(DLVL_MEDIUM, "Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), input.getMember("name").asString().c_str(), source.c_str());
if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){
player_bin = Util::getMyPath() + "MistIn" + input.getMember("name").asString();
curPrio = input.getMember("priority").asInt();
pullMode = true;
selected = true;
}
}
}
}
@ -261,9 +276,16 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir
//finally, unlock the config semaphore
configLock.post();
DEBUG_MSG(DLVL_MEDIUM, "Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str());
if (pullMode){
DEBUG_MSG(DLVL_MEDIUM, "Starting %s -p -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str());
}else{
DEBUG_MSG(DLVL_MEDIUM, "Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str());
}
char * argv[30] = {(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()};
int argNum = 3;
if (pullMode){
argv[++argNum] = (char*)"--pull";
}
std::string debugLvl;
if (Util::Config::printDebugLevel != DEBUG && !str_args.count("--debug")){
debugLvl = JSON::Value((long long)Util::Config::printDebugLevel).asString();

40
lib/util.cpp Normal file
View file

@ -0,0 +1,40 @@
#include "util.h"
#include <iostream>
namespace Util {
bool stringScan(const std::string & src, const std::string & pattern, std::deque<std::string> & result){
result.clear();
std::deque<size_t> positions;
size_t pos = pattern.find("%", 0);
while (pos != std::string::npos){
positions.push_back(pos);
pos = pattern.find("%", pos + 1);
}
if (positions.size() == 0){
return false;
}
size_t sourcePos = 0;
size_t patternPos = 0;
std::deque<size_t>::iterator posIter = positions.begin();
while (sourcePos != std::string::npos){
//Match first part of the string
if (pattern.substr(patternPos, *posIter - patternPos) != src.substr(sourcePos, *posIter - patternPos)){
break;
}
sourcePos += *posIter - patternPos;
std::deque<size_t>::iterator nxtIter = posIter + 1;
if (nxtIter != positions.end()){
patternPos = *posIter+2;
size_t tmpPos = src.find(pattern.substr(*posIter+2, *nxtIter - patternPos), sourcePos);
result.push_back(src.substr(sourcePos, tmpPos - sourcePos));
sourcePos = tmpPos;
}else{
result.push_back(src.substr(sourcePos));
sourcePos = std::string::npos;
}
posIter++;
}
return result.size() == positions.size();
}
}

6
lib/util.h Normal file
View file

@ -0,0 +1,6 @@
#include <string>
#include <deque>
namespace Util {
bool stringScan(const std::string & src, const std::string & pattern, std::deque<std::string> & result);
}

View file

@ -83,6 +83,7 @@ namespace Mist {
singleton = this;
isBuffer = false;
streamMode = false;
}
void Input::checkHeaderTimes(std::string streamFile) {
@ -115,16 +116,27 @@ namespace Mist {
}
}
bool Input::needsLock() {
return !(config->hasOption("pull") && config->getBool("pull"));
}
int Input::run() {
if (config->getBool("json")) {
std::cout << capa.toString() << std::endl;
return 0;
}
if (streamName != "") {
config->getOption("streamname") = streamName;
}
streamName = config->getString("streamname");
nProxy.streamName = streamName;
if (config->getBool("json")) {
std::cout << capa.toString() << std::endl;
return 0;
}
streamMode = config->hasOption("pull") && config->getBool("pull");
INFO_MSG("Stream %s in %s mode", streamName.c_str(), streamMode ? "stream" : "non-stream");
if (!setup()) {
std::cerr << config->getString("cmd") << " setup failed." << std::endl;
return 0;
@ -139,7 +151,9 @@ namespace Mist {
if (!streamName.size()) {
convert();
} else {
} else if (streamMode) {
stream();
}else{
serve();
}
return 0;
@ -243,53 +257,92 @@ namespace Mist {
/// Main loop for stream-style inputs.
/// This loop will start the buffer without resume support, and then repeatedly call ..... followed by ....
void Input::stream(){
IPC::semaphore pullLock;
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!pullLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A pull process for stream %s is already running", streamName.c_str());
return;
}
if (Util::streamAlive(streamName)){
pullLock.post();
pullLock.close();
return;
}
if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer
pullLock.post();
pullLock.close();
return;
}
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
/*LTS-START*/
if(Triggers::shouldTrigger("STREAM_READY", config->getString("streamname"))){
std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n";
if (!Triggers::doTrigger("STREAM_READY", payload, config->getString("streamname"))){
config->is_active = false;
}
}
/*LTS-END*/
userPage.init(userPageName, PLAY_EX_SIZE, true);
if (!isBuffer) {
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
bufferFrame(it->first, 1);
}
}
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str());
long long int activityCounter = Util::bootSecs();
while ((Util::bootSecs() - activityCounter) < 10 && config->is_active) { //10 second timeout
userPage.parseEach(callbackWrapper);
removeUnused();
if (userPage.amount) {
activityCounter = Util::bootSecs();
DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount);
} else {
DEBUG_MSG(DLVL_INSANE, "Timer running");
}
/*LTS-START*/
if ((Util::bootSecs() - activityCounter) >= 10 || !config->is_active){//10 second timeout
if(Triggers::shouldTrigger("STREAM_UNLOAD", config->getString("streamname"))){
std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n";
if (!Triggers::doTrigger("STREAM_UNLOAD", payload, config->getString("streamname"))){
activityCounter = Util::bootSecs();
config->is_active = true;
if (!openStreamSource()){
FAIL_MSG("Unable to connect to source");
pullLock.post();
pullLock.close();
return;
}
parseStreamHeader();
if (myMeta.tracks.size() == 0){
nProxy.userClient.finish();
finish();
pullLock.post();
pullLock.close();
return;
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
it->second.firstms = 0;
it->second.lastms = 0;
}
getNext();
unsigned long long lastTime = Util::getMS();
unsigned long long lastActive = Util::getMS();
while (thisPacket && config->is_active){
nProxy.bufferLivePacket(thisPacket, myMeta);
getNext();
nProxy.userClient.keepAlive();
if (Util::getMS() - lastTime >= 1000){
lastTime = Util::getMS();
if (nProxy.userClient.isSingleEntry()){
if (lastTime - lastActive >= 10000){//10sec timeout
config->is_active = false;
}
}else{
lastActive = lastTime;
}
}
/*LTS-END*/
if (config->is_active){
Util::sleep(1000);
}
closeStreamSource();
while (config->is_active){
Util::sleep(500);
nProxy.userClient.keepAlive();
if (Util::getMS() - lastTime >= 1000){
lastTime = Util::getMS();
if (nProxy.userClient.isSingleEntry()){
if (lastTime - lastActive >= 10000){//10sec timeout
config->is_active = false;
}
}else{
lastActive = lastTime;
}
}
}
nProxy.userClient.finish();
finish();
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str());
//end player functionality
pullLock.post();
pullLock.close();
return;
}
void Input::finish() {

View file

@ -23,6 +23,8 @@ namespace Mist {
virtual void onCrash(){}
virtual void argumentsParsed(){}
virtual ~Input() {};
virtual bool needsLock();
protected:
static void callbackWrapper(char * data, size_t len, unsigned int id);
virtual bool setup() = 0;
@ -31,6 +33,9 @@ namespace Mist {
virtual void getNext(bool smart = true) {};
virtual void seek(int seekTime){};
virtual void finish();
virtual bool openStreamSource() { return false; };
virtual void closeStreamSource() {};
virtual void parseStreamHeader() {};
void play(int until = 0);
void playOnce();
void quitPlay();
@ -40,6 +45,9 @@ namespace Mist {
virtual void userCallback(char * data, size_t len, unsigned int id);
virtual void convert();
virtual void serve();
virtual void stream();
bool streamMode;
virtual void parseHeader();
bool bufferFrame(unsigned int track, unsigned int keyNum);

View file

@ -23,7 +23,7 @@
/*LTS-END*/
namespace Mist {
inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg){
inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg) {
capa["name"] = "Buffer";
JSON::Value option;
option["arg"] = "integer";
@ -94,7 +94,7 @@ namespace Mist {
capa["optional"]["segmentsize"]["type"] = "uint";
capa["optional"]["segmentsize"]["default"] = 5000LL;
option.null();
option["arg"] = "string";
option["long"] = "udp-port";
option["short"] = "U";
@ -139,39 +139,39 @@ namespace Mist {
resumeMode = false;
}
inputBuffer::~inputBuffer(){
inputBuffer::~inputBuffer() {
config->is_active = false;
if (myMeta.tracks.size()){
if (myMeta.tracks.size()) {
/*LTS-START*/
if (myMeta.bufferWindow){
if(Triggers::shouldTrigger("STREAM_BUFFER")){
std::string payload = config->getString("streamname")+"\nEMPTY";
if (myMeta.bufferWindow) {
if (Triggers::shouldTrigger("STREAM_BUFFER")) {
std::string payload = config->getString("streamname") + "\nEMPTY";
Triggers::doTrigger("STREAM_BUFFER", payload, config->getString("streamname"));
}
}
/*LTS-END*/
DEBUG_MSG(DLVL_DEVEL, "Cleaning up, removing last keyframes");
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[it->first];
if (!nProxy.metaPages.count(it->first) || !nProxy.metaPages[it->first].mapped){
if (!nProxy.metaPages.count(it->first) || !nProxy.metaPages[it->first].mapped) {
continue;
}
//First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8){
for (int i = 0; i < 8192; i += 8) {
int * tmpOffset = (int *)(nProxy.metaPages[it->first].mapped + i);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0){
if (tmpOffset[0] == 0 && tmpOffset[1] == 0) {
continue;
}
unsigned long keyNum = ntohl(tmpOffset[0]);
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)){
if (!locations.count(keyNum)) {
locations[keyNum].curOffset = 0;
}
locations[keyNum].pageNum = keyNum;
locations[keyNum].keyNum = ntohl(tmpOffset[1]);
}
for (std::map<unsigned long, DTSCPageData>::iterator it2 = locations.begin(); it2 != locations.end(); it2++){
for (std::map<unsigned long, DTSCPageData>::iterator it2 = locations.begin(); it2 != locations.end(); it2++) {
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), it->first, it2->first);
IPC::sharedPage erasePage(thisPageName, 20971520);
@ -244,48 +244,50 @@ namespace Mist {
}
}
/// \triggers
/// \triggers
/// The `"STREAM_BUFFER"` trigger is stream-specific, and is ran whenever the buffer changes state between playable (FULL) or not (EMPTY). It cannot be cancelled. It is possible to receive multiple EMPTY calls without FULL calls in between, as EMPTY is always generated when a stream is unloaded from memory, even if this stream never reached playable state in the first place (e.g. a broadcast was cancelled before filling enough buffer to be playable). Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// FULL or EMPTY (depending on current state)
/// ~~~~~~~~~~~~~~~
void inputBuffer::updateMeta(){
void inputBuffer::updateMeta() {
static long long unsigned int lastFragCount = 0xFFFFull;
long long unsigned int firstms = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int lastms = 0;
long long unsigned int fragCount = 0xFFFFull;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.type == "meta" || !it->second.type.size()){continue;}
if (it->second.init.size()){
if (!initData.count(it->first) || initData[it->first] != it->second.init){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.type == "meta" || !it->second.type.size()) {
continue;
}
if (it->second.init.size()) {
if (!initData.count(it->first) || initData[it->first] != it->second.init) {
initData[it->first] = it->second.init;
}
}else{
if (initData.count(it->first)){
} else {
if (initData.count(it->first)) {
it->second.init = initData[it->first];
}
}
if (it->second.fragments.size() < fragCount){
if (it->second.fragments.size() < fragCount) {
fragCount = it->second.fragments.size();
}
if (it->second.firstms < firstms){
if (it->second.firstms < firstms) {
firstms = it->second.firstms;
}
if (it->second.lastms > lastms){
if (it->second.lastms > lastms) {
lastms = it->second.lastms;
}
}
/*LTS-START*/
if (fragCount >= FRAG_BOOT && fragCount != 0xFFFFull && (lastFragCount == 0xFFFFull || lastFragCount < FRAG_BOOT)){
if(Triggers::shouldTrigger("STREAM_BUFFER")){
std::string payload = config->getString("streamname")+"\nFULL";
if (fragCount >= FRAG_BOOT && fragCount != 0xFFFFull && (lastFragCount == 0xFFFFull || lastFragCount < FRAG_BOOT)) {
if (Triggers::shouldTrigger("STREAM_BUFFER")) {
std::string payload = config->getString("streamname") + "\nFULL";
Triggers::doTrigger("STREAM_BUFFER", payload, config->getString("streamname"));
}
}
if ((fragCount < FRAG_BOOT || fragCount == 0xFFFFull) && (lastFragCount >= FRAG_BOOT && lastFragCount != 0xFFFFull)){
if(Triggers::shouldTrigger("STREAM_BUFFER")){
std::string payload = config->getString("streamname")+"\nEMPTY";
if ((fragCount < FRAG_BOOT || fragCount == 0xFFFFull) && (lastFragCount >= FRAG_BOOT && lastFragCount != 0xFFFFull)) {
if (Triggers::shouldTrigger("STREAM_BUFFER")) {
std::string payload = config->getString("streamname") + "\nEMPTY";
Triggers::doTrigger("STREAM_BUFFER", payload, config->getString("streamname"));
}
}
@ -298,7 +300,7 @@ namespace Mist {
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore liveMeta(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.wait();
if (!nProxy.metaPages.count(0) || !nProxy.metaPages[0].mapped){
if (!nProxy.metaPages.count(0) || !nProxy.metaPages[0].mapped) {
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
nProxy.metaPages[0].init(pageName, DEFAULT_META_PAGE_SIZE, true);
@ -309,19 +311,19 @@ namespace Mist {
liveMeta.post();
}
bool inputBuffer::removeKey(unsigned int tid){
if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active){
bool inputBuffer::removeKey(unsigned int tid) {
if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active) {
return false;
}
if (!myMeta.tracks[tid].keys.size()){
if (!myMeta.tracks[tid].keys.size()) {
return false;
}
DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber());
//remove all parts of this key
for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++){
for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) {
/*LTS-START*/
if (recFile.is_open()){
if (!recMeta.tracks.count(tid)){
if (recFile.is_open()) {
if (!recMeta.tracks.count(tid)) {
recMeta.tracks[tid] = myMeta.tracks[tid];
recMeta.tracks[tid].reset();
}
@ -335,12 +337,12 @@ namespace Mist {
//re-calculate firstms
myMeta.tracks[tid].firstms = myMeta.tracks[tid].keys[0].getTime();
//delete the fragment if it's no longer fully buffered
if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()){
if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()) {
myMeta.tracks[tid].fragments.pop_front();
myMeta.tracks[tid].missedFrags ++;
}
//if there is more than one page buffered for this track...
if (bufferLocations[tid].size() > 1){
if (bufferLocations[tid].size() > 1) {
//Check if the first key starts on the second page or higher
if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){
DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1);
@ -361,11 +363,11 @@ namespace Mist {
return true;
}
void inputBuffer::eraseTrackDataPages(unsigned long tid){
if (!bufferLocations.count(tid)){
void inputBuffer::eraseTrackDataPages(unsigned long tid) {
if (!bufferLocations.count(tid)) {
return;
}
for (std::map<unsigned long, DTSCPageData>::iterator it = bufferLocations[tid].begin(); it != bufferLocations[tid].end(); it++){
for (std::map<unsigned long, DTSCPageData>::iterator it = bufferLocations[tid].begin(); it != bufferLocations[tid].end(); it++) {
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tid, it->first);
nProxy.curPage[tid].init(thisPageName, 20971520, false, false);
@ -380,68 +382,68 @@ namespace Mist {
void inputBuffer::finish() {
Input::finish();
updateMeta();
if (bufferLocations.size()){
if (bufferLocations.size()) {
std::set<unsigned long> toErase;
for (std::map<unsigned long, std::map<unsigned long, DTSCPageData> >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++){
for (std::map<unsigned long, std::map<unsigned long, DTSCPageData> >::iterator it = bufferLocations.begin(); it != bufferLocations.end(); it++) {
toErase.insert(it->first);
}
for (std::set<unsigned long>::iterator it = toErase.begin(); it != toErase.end(); ++it){
for (std::set<unsigned long>::iterator it = toErase.begin(); it != toErase.end(); ++it) {
eraseTrackDataPages(*it);
}
}
}
/// \triggers
/// \triggers
/// The `"STREAM_TRACK_REMOVE"` trigger is stream-specific, and is ran whenever a track is fully removed from a live strean buffer. It cannot be cancelled. Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// trackID
/// ~~~~~~~~~~~~~~~
void inputBuffer::removeUnused(){
void inputBuffer::removeUnused() {
//first remove all tracks that have not been updated for too long
bool changed = true;
while (changed){
while (changed) {
changed = false;
long long unsigned int time = Util::bootSecs();
long long unsigned int compareFirst = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int compareLast = 0;
//for tracks that were updated in the last 5 seconds, get the first and last ms edges.
for (std::map<unsigned int, DTSC::Track>::iterator it2 = myMeta.tracks.begin(); it2 != myMeta.tracks.end(); it2++){
if ((time - lastUpdated[it2->first]) > 5){
for (std::map<unsigned int, DTSC::Track>::iterator it2 = myMeta.tracks.begin(); it2 != myMeta.tracks.end(); it2++) {
if ((time - lastUpdated[it2->first]) > 5) {
continue;
}
if (it2->second.lastms > compareLast){
if (it2->second.lastms > compareLast) {
compareLast = it2->second.lastms;
}
if (it2->second.firstms < compareFirst){
if (it2->second.firstms < compareFirst) {
compareFirst = it2->second.firstms;
}
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
//if not updated for an entire buffer duration, or last updated track and this track differ by an entire buffer duration, erase the track.
if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000) ||
(compareLast && (long long int)(time - lastUpdated[it->first]) > 5 && (
(compareLast < it->second.firstms && (long long int)(it->second.firstms - compareLast) > bufferTime)
||
(compareFirst > it->second.lastms && (long long int)(compareFirst - it->second.lastms) > bufferTime)
))
){
(compareLast && (long long int)(time - lastUpdated[it->first]) > 5 && (
(compareLast < it->second.firstms && (long long int)(it->second.firstms - compareLast) > bufferTime)
||
(compareFirst > it->second.lastms && (long long int)(compareFirst - it->second.lastms) > bufferTime)
))
) {
unsigned int tid = it->first;
//erase this track
if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000)){
if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000)) {
INFO_MSG("Erasing track %d because not updated for %ds (> %ds)", it->first, (long long int)(time - lastUpdated[it->first]), (long long int)(bufferTime / 1000));
}else{
INFO_MSG("Erasing inactive track %u because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", it->first, it->second.firstms / 1000, it->second.lastms / 1000, compareFirst/1000, compareLast/1000, bufferTime / 1000);
} else {
INFO_MSG("Erasing inactive track %u because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", it->first, it->second.firstms / 1000, it->second.lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000);
}
/*LTS-START*/
if(Triggers::shouldTrigger("STREAM_TRACK_REMOVE")){
std::string payload = config->getString("streamname")+"\n"+JSON::Value((long long)it->first).asString()+"\n";
if (Triggers::shouldTrigger("STREAM_TRACK_REMOVE")) {
std::string payload = config->getString("streamname") + "\n" + JSON::Value((long long)it->first).asString() + "\n";
Triggers::doTrigger("STREAM_TRACK_REMOVE", payload, config->getString("streamname"));
}
}
/*LTS-END*/
lastUpdated.erase(tid);
/// \todo Consider replacing with eraseTrackDataPages(it->first)?
while (bufferLocations[tid].size()){
while (bufferLocations[tid].size()) {
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), (unsigned long)tid, bufferLocations[tid].begin()->first);
nProxy.curPage[tid].init(thisPageName, 20971520);
@ -477,14 +479,14 @@ namespace Mist {
}
//find the earliest video keyframe stored
unsigned int firstVideo = 1;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.type == "video"){
if (it->second.firstms < firstVideo || firstVideo == 1){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.type == "video") {
if (it->second.firstms < firstVideo || firstVideo == 1) {
firstVideo = it->second.firstms;
}
}
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
//non-video tracks need to have a second keyframe that is <= firstVideo
//firstVideo = 1 happens when there are no tracks, in which case we don't care any more
if (it->second.type != "video"){
@ -493,39 +495,39 @@ namespace Mist {
}
}
//Buffer cutting
while (it->second.keys.size() > 1 && it->second.keys[0].getTime() < cutTime){
if (!removeKey(it->first)){
while (it->second.keys.size() > 1 && it->second.keys[0].getTime() < cutTime) {
if (!removeKey(it->first)) {
break;
}
}
//Buffer size management
while (it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime){
if (!removeKey(it->first)){
while (it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime) {
if (!removeKey(it->first)) {
break;
}
}
}
updateMeta();
static bool everHadPush = false;
if (hasPush){
if (hasPush) {
hasPush = false;
everHadPush = true;
}else if(everHadPush && !resumeMode && config->is_active){
} else if (everHadPush && !resumeMode && config->is_active) {
INFO_MSG("Shutting down buffer because resume mode is disabled and the source disconnected");
config->is_active = false;
}
}
/// \triggers
/// \triggers
/// The `"STREAM_TRACK_ADD"` trigger is stream-specific, and is ran whenever a new track is added to a live strean buffer. It cannot be cancelled. Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// trackID
/// ~~~~~~~~~~~~~~~
void inputBuffer::userCallback(char * data, size_t len, unsigned int id){
void inputBuffer::userCallback(char * data, size_t len, unsigned int id) {
/*LTS-START*/
//Reload the configuration to make sure we stay up to date with changes through the api
if (Util::epoch() - lastReTime > 4){
if (Util::epoch() - lastReTime > 4) {
setup();
}
/*LTS-END*/
@ -535,27 +537,27 @@ namespace Mist {
char counter = (*(data - 1));
//Each user can have at maximum SIMUL_TRACKS elements in their userpage.
IPC::userConnection userConn(data);
for (int index = 0; index < SIMUL_TRACKS; index++){
for (int index = 0; index < SIMUL_TRACKS; index++) {
//Get the track id from the current element
unsigned long value = userConn.getTrackId(index);
//Skip value 0xFFFFFFFF as this indicates a previously declined track
if (value == 0xFFFFFFFF){
if (value == 0xFFFFFFFF) {
continue;
}
//Skip value 0 as this indicates an empty track
if (value == 0){
if (value == 0) {
continue;
}
//If the current value indicates a valid trackid, and it is pushed from this user
if (pushLocation[value] == data){
if (pushLocation[value] == data) {
//Check for timeouts, and erase the track if necessary
if (counter == 126 || counter == 127 || counter == 254 || counter == 255){
if (counter == 126 || counter == 127 || counter == 254 || counter == 255) {
pushLocation.erase(value);
if (negotiatingTracks.count(value)){
if (negotiatingTracks.count(value)) {
negotiatingTracks.erase(value);
}
if (activeTracks.count(value)){
if (activeTracks.count(value)) {
updateMeta();
eraseTrackDataPages(value);
activeTracks.erase(value);
@ -568,7 +570,67 @@ namespace Mist {
}
//Track is set to "New track request", assign new track id and create shared memory page
//This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process
if (value & 0x80000000){
if (value & 0x80000000) {
if (value & 0x40000000) {
unsigned long finalMap = value & ~0xC0000000;
//Register the new track as an active track.
activeTracks.insert(finalMap);
//Register the time of registration as initial value for the lastUpdated field, plus an extra 5 seconds just to be sure.
lastUpdated[finalMap] = Util::bootSecs() + 5;
//Register the user thats is pushing this element
pushLocation[finalMap] = data;
//Initialize the metadata for this track
if (!myMeta.tracks.count(finalMap)) {
DEBUG_MSG(DLVL_MEDIUM, "Inserting metadata for track number %d", finalMap);
IPC::sharedPage tMeta;
char tempMetaName[NAME_BUFFER_SIZE];
snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), finalMap);
tMeta.init(tempMetaName, 8388608, false);
//The page exist, now we try to read in the metadata of the track
//Store the size of the dtsc packet to read.
unsigned int len = ntohl(((int *)tMeta.mapped)[1]);
//Temporary variable, won't be used again
unsigned int tempForReadingMeta = 0;
//Read in the metadata through a temporary JSON object
///\todo Optimize this part. Find a way to not have to store the metadata in JSON first, but read it from the page immediately
JSON::Value tempJSONForMeta;
JSON::fromDTMI((const unsigned char *)tMeta.mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
tMeta.master = true;
//Construct a metadata object for the current track
DTSC::Meta trackMeta(tempJSONForMeta);
myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second;
myMeta.tracks[finalMap].firstms = 0;
myMeta.tracks[finalMap].lastms = 0;
userConn.setTrackId(index, finalMap);
userConn.setKeynum(index, 0x0000);
char firstPage[NAME_BUFFER_SIZE];
snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap);
nProxy.metaPages[finalMap].init(firstPage, 8192, false);
INFO_MSG("Meh %d", finalMap);
//Update the metadata for this track
updateTrackMeta(finalMap);
INFO_MSG("Setting hasPush to true, quickNegotiate");
hasPush = true;
}
//Write the final mapped track number and keyframe number to the user page element
//This is used to resume pushing as well as pushing new tracks
userConn.setTrackId(index, finalMap);
userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size());
//Update the metadata to reflect all changes
updateMeta();
continue;
}
//Set the temporary track id for this item, and increase the temporary value for use with the next track
unsigned long long tempMapping = nextTempId++;
//Add the temporary track id to the list of tracks that are currently being negotiated
@ -583,9 +645,9 @@ namespace Mist {
}
//The track id is set to the value of a track that we are currently negotiating about
if (negotiatingTracks.count(value)){
if (negotiatingTracks.count(value)) {
//If the metadata page for this track is not yet registered, initialize it
if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped){
if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) {
char tempMetaName[NAME_BUFFER_SIZE];
snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), value);
nProxy.metaPages[value].init(tempMetaName, 8388608, false, false);
@ -593,7 +655,7 @@ namespace Mist {
//If this tracks metdata page is not initialize, skip the entire element for now. It will be instantiated later
if (!nProxy.metaPages[value].mapped) {
//remove the negotiation if it has timed out
if (++negotiationTimeout[value] >= 1000){
if (++negotiationTimeout[value] >= 1000) {
negotiatingTracks.erase(value);
negotiationTimeout.erase(value);
}
@ -615,7 +677,7 @@ namespace Mist {
//If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call.
if (!trackMeta.tracks.count(value)) {
//remove the negotiation if it has timed out
if (++negotiationTimeout[value] >= 1000){
if (++negotiationTimeout[value] >= 1000) {
negotiatingTracks.erase(value);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
nProxy.metaPages[value].master = true;
@ -630,10 +692,10 @@ namespace Mist {
/*LTS-START*/
//Get the identifier for the track, and attempt colission detection.
int collidesWith = -1;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
//If the identifier of an existing track and the current track match, assume the are the same track and reject the negotiated one.
///\todo Maybe switch to a new form of detecting collisions, especially with regards to multiple audio languages and camera angles.
if (it->second.getIdentifier() == trackIdentifier){
if (it->second.getIdentifier() == trackIdentifier) {
collidesWith = it->first;
break;
}
@ -646,21 +708,21 @@ namespace Mist {
nProxy.metaPages.erase(value);
//Check if the track collides, and whether the track it collides with is active.
if (collidesWith != -1 && activeTracks.count(collidesWith)){/*LTS*/
if (collidesWith != -1 && activeTracks.count(collidesWith)) { /*LTS*/
//Print a warning message and set the state of the track to rejected.
WARN_MSG("Collision of temporary track %lu with existing track %d detected. Handling as a new valid track.", value, collidesWith);
collidesWith = -1;
}
/*LTS-START*/
unsigned long finalMap = collidesWith;
if (finalMap == -1){
if (finalMap == -1) {
//No collision has been detected, assign a new final number
finalMap = (myMeta.tracks.size() ? myMeta.tracks.rbegin()->first : 0) + 1;
DEBUG_MSG(DLVL_DEVEL, "No colision detected for temporary track %lu from user %u, assigning final track number %lu", value, id, finalMap);
if(Triggers::shouldTrigger("STREAM_TRACK_ADD")){
std::string payload = config->getString("streamname")+"\n"+JSON::Value((long long)finalMap).asString()+"\n";
if (Triggers::shouldTrigger("STREAM_TRACK_ADD")) {
std::string payload = config->getString("streamname") + "\n" + JSON::Value((long long)finalMap).asString() + "\n";
Triggers::doTrigger("STREAM_TRACK_ADD", payload, config->getString("streamname"));
}
}
}
/*LTS-END*/
//Resume either if we have more than 1 keyframe on the replacement track (assume it was already pushing before the track "dissapeared")
@ -690,7 +752,7 @@ namespace Mist {
//Register the user thats is pushing this element
pushLocation[finalMap] = data;
//Initialize the metadata for this track if it was not in place yet.
if (!myMeta.tracks.count(finalMap)){
if (!myMeta.tracks.count(finalMap)) {
DEBUG_MSG(DLVL_MEDIUM, "Inserting metadata for track number %d", finalMap);
myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second;
myMeta.tracks[finalMap].trackID = finalMap;
@ -703,14 +765,14 @@ namespace Mist {
updateMeta();
}
//If the track is active, and this is the element responsible for pushing it
if (activeTracks.count(value) && pushLocation[value] == data){
if (activeTracks.count(value) && pushLocation[value] == data) {
//Open the track index page if we dont have it open yet
if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped){
if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) {
char firstPage[NAME_BUFFER_SIZE];
snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), value);
nProxy.metaPages[value].init(firstPage, 8192, false, false);
}
if (nProxy.metaPages[value].mapped){
if (nProxy.metaPages[value].mapped) {
//Update the metadata for this track
updateTrackMeta(value);
hasPush = true;
@ -719,7 +781,7 @@ namespace Mist {
}
}
void inputBuffer::updateTrackMeta(unsigned long tNum){
void inputBuffer::updateTrackMeta(unsigned long tNum) {
VERYHIGH_MSG("Updating meta for track %d", tNum);
//Store a reference for easier access
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[tNum];
@ -728,34 +790,34 @@ namespace Mist {
//First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8) {
int * tmpOffset = (int *)(mappedPointer + i);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0){
if (tmpOffset[0] == 0 && tmpOffset[1] == 0) {
continue;
}
unsigned long keyNum = ntohl(tmpOffset[0]);
INSANE_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1]));
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)){
if (!locations.count(keyNum)) {
locations[keyNum].curOffset = 0;
}
locations[keyNum].pageNum = keyNum;
locations[keyNum].keyNum = ntohl(tmpOffset[1]);
}
//Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest
for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++){
for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) {
updateMetaFromPage(tNum, pageIt->first);
}
updateMeta();
}
void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum){
void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum) {
VERYHIGH_MSG("Updating meta for track %d page %d", tNum, pageNum);
DTSCPageData & pageData = bufferLocations[tNum][pageNum];
//If the current page is over its 8mb "splitting" boundary
if (pageData.curOffset > (8 * 1024 * 1024)){
if (pageData.curOffset > (8 * 1024 * 1024)) {
//And the last keyframe in the parsed metadata is further in the stream than this page
if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()){
if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()) {
//Assume the entire page is already parsed
return;
}
@ -764,14 +826,14 @@ namespace Mist {
//Otherwise open and parse the page
//Open the page if it is not yet open
if (!nProxy.curPageNum.count(tNum) || nProxy.curPageNum[tNum] != pageNum || !nProxy.curPage[tNum].mapped){
if (!nProxy.curPageNum.count(tNum) || nProxy.curPageNum[tNum] != pageNum || !nProxy.curPage[tNum].mapped) {
//DO NOT ERASE THE PAGE HERE, master is not set to true
nProxy.curPageNum.erase(tNum);
char nextPageName[NAME_BUFFER_SIZE];
snprintf(nextPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tNum, pageNum);
nProxy.curPage[tNum].init(nextPageName, 20971520);
//If the page can not be opened, stop here
if (!nProxy.curPage[tNum].mapped){
if (!nProxy.curPage[tNum].mapped) {
WARN_MSG("Could not open page: %s", nextPageName);
return;
}
@ -780,21 +842,21 @@ namespace Mist {
DTSC::Packet tmpPack;
if (!nProxy.curPage[tNum].mapped[pageData.curOffset]){
if (!nProxy.curPage[tNum].mapped[pageData.curOffset]) {
VERYHIGH_MSG("No packet on page %lu for track %lu, waiting...", pageNum, tNum);
return;
}
tmpPack.reInit(nProxy.curPage[tNum].mapped + pageData.curOffset, 0);
//No new data has been written on the page since last update
if (!tmpPack){
if (!tmpPack) {
return;
}
lastUpdated[tNum] = Util::bootSecs();
while (tmpPack){
while (tmpPack) {
//Update the metadata with this packet
myMeta.update(tmpPack, segmentSize);/*LTS*/
//Set the first time when appropriate
if (pageData.firstTime == 0){
if (pageData.firstTime == 0) {
pageData.firstTime = tmpPack.getTime();
}
//Update the offset on the page with the size of the current packet
@ -804,7 +866,7 @@ namespace Mist {
}
}
bool inputBuffer::setup(){
bool inputBuffer::setup() {
lastReTime = Util::epoch(); /*LTS*/
std::string strName = config->getString("streamname");
Util::sanitizeName(strName);
@ -816,10 +878,10 @@ namespace Mist {
long long tmpNum;
//if stream is configured and setting is present, use it, always
if (streamCfg && streamCfg.getMember("DVR")){
if (streamCfg && streamCfg.getMember("DVR")) {
tmpNum = streamCfg.getMember("DVR").asInt();
} else {
if (streamCfg){
if (streamCfg) {
//otherwise, if stream is configured use the default
tmpNum = config->getOption("bufferTime", true)[0u].asInt();
} else {
@ -827,19 +889,21 @@ namespace Mist {
tmpNum = config->getOption("bufferTime").asInt();
}
}
if (tmpNum < 1000){tmpNum = 1000;}
if (tmpNum < 1000) {
tmpNum = 1000;
}
//if the new value is different, print a message and apply it
if (bufferTime != tmpNum){
if (bufferTime != tmpNum) {
DEBUG_MSG(DLVL_DEVEL, "Setting bufferTime from %u to new value of %lli", bufferTime, tmpNum);
bufferTime = tmpNum;
}
/*LTS-START*/
//if stream is configured and setting is present, use it, always
if (streamCfg && streamCfg.getMember("cut")){
if (streamCfg && streamCfg.getMember("cut")) {
tmpNum = streamCfg.getMember("cut").asInt();
} else {
if (streamCfg){
if (streamCfg) {
//otherwise, if stream is configured use the default
tmpNum = config->getOption("cut", true)[0u].asInt();
} else {
@ -848,16 +912,16 @@ namespace Mist {
}
}
//if the new value is different, print a message and apply it
if (cutTime != tmpNum){
if (cutTime != tmpNum) {
DEBUG_MSG(DLVL_DEVEL, "Setting cutTime from %u to new value of %lli", cutTime, tmpNum);
cutTime = tmpNum;
}
//if stream is configured and setting is present, use it, always
if (streamCfg && streamCfg.getMember("resume")){
if (streamCfg && streamCfg.getMember("resume")) {
tmpNum = streamCfg.getMember("resume").asInt();
} else {
if (streamCfg){
if (streamCfg) {
//otherwise, if stream is configured use the default
tmpNum = config->getOption("resume", true)[0u].asInt();
} else {
@ -866,16 +930,16 @@ namespace Mist {
}
}
//if the new value is different, print a message and apply it
if (resumeMode != (bool)tmpNum){
DEBUG_MSG(DLVL_DEVEL, "Setting resume mode from %s to new value of %s", resumeMode?"enabled":"disabled", tmpNum?"enabled":"disabled");
if (resumeMode != (bool)tmpNum) {
DEBUG_MSG(DLVL_DEVEL, "Setting resume mode from %s to new value of %s", resumeMode ? "enabled" : "disabled", tmpNum ? "enabled" : "disabled");
resumeMode = tmpNum;
}
//if stream is configured and setting is present, use it, always
if (streamCfg && streamCfg.getMember("segmentsize")){
if (streamCfg && streamCfg.getMember("segmentsize")) {
tmpNum = streamCfg.getMember("segmentsize").asInt();
} else {
if (streamCfg){
if (streamCfg) {
//otherwise, if stream is configured use the default
tmpNum = config->getOption("segmentsize", true)[0u].asInt();
} else {
@ -884,7 +948,7 @@ namespace Mist {
}
}
//if the new value is different, print a message and apply it
if (segmentSize != tmpNum){
if (segmentSize != tmpNum) {
DEBUG_MSG(DLVL_DEVEL, "Setting segmentSize from %u to new value of %lli", segmentSize, tmpNum);
segmentSize = tmpNum;
}
@ -929,8 +993,8 @@ namespace Mist {
bool has_keyframes = false;
std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin();
while (it != myMeta.tracks.end()) {
DTSC::Track& tr = it->second;
DTSC::Track & tr = it->second;
if (tr.type != "video") {
++it;
continue;
@ -943,33 +1007,23 @@ namespace Mist {
++it;
}
if (streamCfg
&& streamCfg.getMember("record")
&& streamCfg.getMember("record").asString().size() > 0
&& has_keyframes
)
{
if (streamCfg && streamCfg.getMember("record") && streamCfg.getMember("record").asString().size() > 0 && has_keyframes) {
// @todo check if output is already running ?
if (recordingPid == -1 && config != NULL){
configLock.post();
configLock.close();
INFO_MSG("The stream %s has a value specified for the recording. "
"We're going to start an output and record into %s",
config->getString("streamname").c_str(),
streamCfg.getMember("record").asString().c_str());
// @todo check if output is already running ?
if (recordingPid == -1 && config != NULL) {
recordingPid = Util::startRecording(config->getString("streamname"));
if (recordingPid < 0) {
FAIL_MSG("Failed to start the recording for %s", config->getString("streamname").c_str());
// @todo shouldn't we do configList.post(), configLock.close() and return false?
// @todo discuss with Jaron. 2015.09.26, remove this comment when discussed.
}
INFO_MSG("We started an output for recording with PID: %d", recordingPid);
return true;
INFO_MSG("The stream %s has a value specified for the recording. We're goint to start an output and record into %s", config->getString("streamname").c_str(), streamCfg.getMember("record").asString().c_str());
configLock.post();
configLock.close();
recordingPid = Util::startRecording(config->getString("streamname"));
if (recordingPid < 0) {
FAIL_MSG("Failed to start the recording for %s", config->getString("streamname").c_str());
}
INFO_MSG("We started an output for recording with PID: %d", recordingPid);
return true;
}
}
/* roxlu-end */
@ -979,15 +1033,15 @@ namespace Mist {
return true;
}
bool inputBuffer::readHeader(){
bool inputBuffer::readHeader() {
return true;
}
void inputBuffer::getNext(bool smart){}
void inputBuffer::getNext(bool smart) {}
void inputBuffer::seek(int seekTime){}
void inputBuffer::seek(int seekTime) {}
void inputBuffer::trackSelect(std::string trackSpec){}
void inputBuffer::trackSelect(std::string trackSpec) {}
}

View file

@ -7,6 +7,9 @@
#include <mist/stream.h>
#include <mist/defines.h>
#include <mist/util.h>
#include <mist/bitfields.h>
#include "input_dtsc.h"
namespace Mist {
@ -15,6 +18,7 @@ namespace Mist {
capa["desc"] = "Enables DTSC Input";
capa["priority"] = 9ll;
capa["source_match"] = "/*.dtsc";
capa["stream_match"] = "dtsc://*";
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("H263");
capa["codecs"][0u][0u].append("VP6");
@ -22,59 +26,253 @@ namespace Mist {
capa["codecs"][0u][1u].append("AAC");
capa["codecs"][0u][1u].append("MP3");
capa["codecs"][0u][1u].append("vorbis");
JSON::Value option;
option["long"] = "pull";
option["short"] = "p";
option["help"] = "Start this input in pull mode.";
option["value"].append(0ll);
config->addOption("pull", option);
}
void parseDTSCURI(const std::string & src, std::string & host, uint16_t & port, std::string & password, std::string & streamName) {
host = "";
port = 4200;
password = "";
streamName = "";
std::deque<std::string> matches;
if (Util::stringScan(src, "%s:%s@%s/%s", matches)) {
host = matches[0];
port = atoi(matches[1].c_str());
password = matches[2];
streamName = matches[3];
return;
}
//Using default streamname
if (Util::stringScan(src, "%s:%s@%s", matches)) {
host = matches[0];
port = atoi(matches[1].c_str());
password = matches[2];
return;
}
//Without password
if (Util::stringScan(src, "%s:%s/%s", matches)) {
host = matches[0];
port = atoi(matches[1].c_str());
streamName = matches[2];
return;
}
//Using default port
if (Util::stringScan(src, "%s@%s/%s", matches)) {
host = matches[0];
password = matches[1];
streamName = matches[2];
return;
}
//Default port, no password
if (Util::stringScan(src, "%s/%s", matches)) {
host = matches[0];
streamName = matches[1];
return;
}
//No password, default streamname
if (Util::stringScan(src, "%s:%s", matches)) {
host = matches[0];
port = atoi(matches[1].c_str());
return;
}
//Default port and streamname
if (Util::stringScan(src, "%s@%s", matches)) {
host = matches[0];
password = matches[1];
return;
}
//Default port and streamname, no password
if (Util::stringScan(src, "%s", matches)) {
host = matches[0];
return;
}
}
void inputDTSC::parseStreamHeader() {
while (srcConn.connected()){
srcConn.spool();
if (srcConn.Received().available(8)){
if (srcConn.Received().copy(4) == "DTCM" || srcConn.Received().copy(4) == "DTSC") {
// Command message
std::string toRec = srcConn.Received().copy(8);
unsigned long rSize = Bit::btohl(toRec.c_str() + 4);
if (!srcConn.Received().available(8 + rSize)) {
continue; //abort - not enough data yet
}
//Ignore initial DTCM message, as this is a "hi" message from the server
if (srcConn.Received().copy(4) == "DTCM"){
srcConn.Received().remove(8 + rSize);
}else{
std::string dataPacket = srcConn.Received().remove(8+rSize);
DTSC::Packet metaPack(dataPacket.data(), dataPacket.size());
myMeta.reinit(metaPack);
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
continueNegotiate(it->first, true);
}
break;
}
}else{
INFO_MSG("Received a wrong type of packet - '%s'", srcConn.Received().copy(4).c_str());
break;
}
}
}
}
bool inputDTSC::openStreamSource() {
std::string source = config->getString("input");
if (source.find("dtsc://") == 0) {
source.erase(0, 7);
}
std::string host;
uint16_t port;
std::string password;
std::string streamName;
parseDTSCURI(source, host, port, password, streamName);
std::string givenStream = config->getString("streamname");
if (streamName == "") {
streamName = givenStream;
}else{
if (givenStream.find("+") != std::string::npos){
streamName += givenStream.substr(givenStream.find("+"));
}
}
srcConn = Socket::Connection(host, port, true);
if (!srcConn.connected()){
return false;
}
JSON::Value prep;
prep["cmd"] = "play";
prep["version"] = "MistServer " PACKAGE_VERSION;
prep["stream"] = streamName;
srcConn.SendNow("DTCM");
char sSize[4] = {0, 0, 0, 0};
Bit::htobl(sSize, prep.packedSize());
srcConn.SendNow(sSize, 4);
prep.sendTo(srcConn);
return true;
}
void inputDTSC::closeStreamSource(){
srcConn.close();
}
bool inputDTSC::setup() {
if (config->getString("input") == "-") {
std::cerr << "Input from stdin not yet supported" << std::endl;
return false;
}
if (!config->getString("streamname").size()){
if (config->getString("output") == "-") {
std::cerr << "Output to stdout not yet supported" << std::endl;
if (streamMode) {
return true;
} else {
if (config->getString("input") == "-") {
std::cerr << "Input from stdin not yet supported" << std::endl;
return false;
}
}else{
if (config->getString("output") != "-") {
std::cerr << "File output in player mode not supported" << std::endl;
if (!config->getString("streamname").size()) {
if (config->getString("output") == "-") {
std::cerr << "Output to stdout not yet supported" << std::endl;
return false;
}
} else {
if (config->getString("output") != "-") {
std::cerr << "File output in player mode not supported" << std::endl;
return false;
}
}
//open File
inFile = DTSC::File(config->getString("input"));
if (!inFile) {
return false;
}
}
//open File
inFile = DTSC::File(config->getString("input"));
if (!inFile) {
return false;
}
return true;
}
bool inputDTSC::readHeader() {
if (streamMode) {
return true;
}
if (!inFile) {
return false;
}
DTSC::File tmp(config->getString("input") + ".dtsh");
if (tmp) {
myMeta = tmp.getMeta();
DEBUG_MSG(DLVL_HIGH,"Meta read in with %lu tracks", myMeta.tracks.size());
DEBUG_MSG(DLVL_HIGH, "Meta read in with %lu tracks", myMeta.tracks.size());
return true;
}
if (inFile.getMeta().moreheader < 0 || inFile.getMeta().tracks.size() == 0) {
DEBUG_MSG(DLVL_FAIL,"Missing external header file");
DEBUG_MSG(DLVL_FAIL, "Missing external header file");
return false;
}
myMeta = DTSC::Meta(inFile.getMeta());
DEBUG_MSG(DLVL_DEVEL,"Meta read in with %lu tracks", myMeta.tracks.size());
DEBUG_MSG(DLVL_DEVEL, "Meta read in with %lu tracks", myMeta.tracks.size());
return true;
}
void inputDTSC::getNext(bool smart) {
if (smart){
inFile.seekNext();
if (streamMode){
thisPacket.reInit(srcConn);
if (thisPacket.getVersion() == DTSC::DTCM){
std::string cmd;
thisPacket.getString("cmd", cmd);
if (cmd == "reset"){
//Read next packet
thisPacket.reInit(srcConn);
if (thisPacket.getVersion() == DTSC::DTSC_HEAD){
DTSC::Meta newMeta;
newMeta.reinit(thisPacket);
//Detect new tracks
std::set<unsigned int> newTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = newMeta.tracks.begin(); it != newMeta.tracks.end(); it++){
if (!myMeta.tracks.count(it->first)){
newTracks.insert(it->first);
}
}
for (std::set<unsigned int>::iterator it = newTracks.begin(); it != newTracks.end(); it++){
INFO_MSG("Adding track %d to internal metadata", *it);
myMeta.tracks[*it] = newMeta.tracks[*it];
continueNegotiate(*it, true);
}
//Detect removed tracks
std::set<unsigned int> deletedTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!newMeta.tracks.count(it->first)){
deletedTracks.insert(it->first);
}
}
for(std::set<unsigned int>::iterator it = deletedTracks.begin(); it != deletedTracks.end(); it++){
INFO_MSG("Deleting track %d from internal metadata", *it);
myMeta.tracks.erase(*it);
}
//Read next packet before returning
thisPacket.reInit(srcConn);
}else{
myMeta = DTSC::Meta();
}
}else{
//Read next packet before returning
thisPacket.reInit(srcConn);
}
}
}else{
inFile.parseNext();
if (smart) {
inFile.seekNext();
} else {
inFile.parseNext();
}
thisPacket = inFile.getPacket();
}
thisPacket = inFile.getPacket();
}
void inputDTSC::seek(int seekTime) {

View file

@ -7,6 +7,9 @@ namespace Mist {
inputDTSC(Util::Config * cfg);
protected:
//Private Functions
bool openStreamSource();
void closeStreamSource();
void parseStreamHeader();
bool setup();
bool readHeader();
void getNext(bool smart = true);
@ -14,6 +17,8 @@ namespace Mist {
void trackSelect(std::string trackSpec);
DTSC::File inFile;
Socket::Connection srcConn;
};
}

View file

@ -441,6 +441,10 @@ namespace Mist {
}
} while (threadCount);
}
bool inputTS::needsLock() {
return false;
}
#endif
}

View file

@ -13,6 +13,9 @@ namespace Mist {
public:
inputTS(Util::Config * cfg);
~inputTS();
#ifdef TSLIVE_INPUT
bool needsLock();
#endif
protected:
//Private Functions
bool setup();

View file

@ -17,32 +17,33 @@ int main(int argc, char * argv[]) {
if (conf.parseArgs(argc, argv)) {
std::string streamName = conf.getString("streamname");
conv.argumentsParsed();
#ifndef INPUT_NOLOCK
IPC::semaphore playerLock;
if (streamName.size()){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str());
playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str());
return 1;
if (conv.needsLock()){
if (streamName.size()){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str());
playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str());
return 1;
}
}
}
#endif
conf.activate();
while (conf.is_active){
pid_t pid = fork();
if (pid == 0){
#ifndef INPUT_NOLOCK
playerLock.close();
#endif
if (conv.needsLock()){
playerLock.close();
}
return conv.run();
}
if (pid == -1){
DEBUG_MSG(DLVL_FAIL, "Unable to spawn player process");
#ifndef INPUT_NOLOCK
playerLock.post();
#endif
if (conv.needsLock()){
playerLock.post();
}
return 2;
}
//wait for the process to exit
@ -71,11 +72,11 @@ int main(int argc, char * argv[]) {
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Restarting...", streamName.c_str());
}
}
#ifndef INPUT_NOLOCK
playerLock.post();
playerLock.unlink();
playerLock.close();
#endif
if (conv.needsLock()){
playerLock.post();
playerLock.unlink();
playerLock.close();
}
}
return 0;
}

View file

@ -559,10 +559,11 @@ namespace Mist {
bufferNext(packet, myMeta);
}
void InOutBase::continueNegotiate(unsigned long tid) {
nProxy.continueNegotiate(tid, myMeta);
void InOutBase::continueNegotiate(unsigned long tid, bool quickNegotiate) {
nProxy.continueNegotiate(tid, myMeta, quickNegotiate);
}
void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta) {
void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate) {
if (!tid) {
return;
}
@ -618,12 +619,49 @@ namespace Mist {
unsigned long offset = 6 * trackOffset[tid];
//If we have a new track to negotiate
if (!trackState.count(tid)) {
INFO_MSG("Starting negotiation for incoming track %lu, at offset %lu", tid, trackOffset[tid]);
memset(tmp + offset, 0, 4);
tmp[offset] = 0x80;
tmp[offset + 4] = ((tid >> 8) & 0xFF);
tmp[offset + 5] = (tid & 0xFF);
trackState[tid] = FILL_NEW;
if (quickNegotiate){
unsigned long finalTid = getpid() + tid;
unsigned short firstPage = 1;
INFO_MSG("HANDLING quick negotiation for track %d ~> %d", tid, finalTid)
MEDIUM_MSG("Buffer has indicated that incoming track %lu should start writing on track %lu, page %lu", tid, finalTid, firstPage);
trackMap[tid] = finalTid;
if (myMeta.tracks.count(finalTid) && myMeta.tracks[finalTid].lastms){
myMeta.tracks[finalTid].lastms = 0;
}
trackState[tid] = FILL_ACC;
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), finalTid);
metaPages[tid].init(pageName, 8 * 1024 * 1024, true);
metaPages[tid].master = false;
DTSC::Meta tmpMeta;
tmpMeta.tracks[finalTid] = myMeta.tracks[tid];
tmpMeta.tracks[finalTid].trackID = finalTid;
JSON::Value tmpVal = tmpMeta.toJSON();
std::string tmpStr = tmpVal.toNetPacked();
memcpy(metaPages[tid].mapped, tmpStr.data(), tmpStr.size());
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), finalTid);
metaPages[tid].init(pageName, 8 * 1024 * 1024, true);
metaPages[tid].master = false;
Bit::htobl(tmp + offset, finalTid | 0xC0000000);
Bit::htobs(tmp + offset + 4, firstPage);
}else{
INFO_MSG("Starting negotiation for incoming track %lu, at offset %lu", tid, trackOffset[tid]);
memset(tmp + offset, 0, 4);
tmp[offset] = 0x80;
tmp[offset + 4] = ((tid >> 8) & 0xFF);
tmp[offset + 5] = (tid & 0xFF);
trackState[tid] = FILL_NEW;
}
return;
}
#if defined(__CYGWIN__) || defined(_WIN32)

View file

@ -59,7 +59,7 @@ namespace Mist {
std::map<int,unsigned long long int> iVecs;
IPC::sharedPage encryptionPage;
void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta);
void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate = false);
};
///\brief Class containing all basic input and output functions.
@ -74,7 +74,7 @@ namespace Mist {
void bufferLivePacket(JSON::Value & packet);
void bufferLivePacket(DTSC::Packet & packet);
protected:
void continueNegotiate(unsigned long tid);
void continueNegotiate(unsigned long tid, bool quickNegotiate = false);

View file

@ -278,6 +278,16 @@ namespace Mist {
onFail();
return;
}
if (!source.size()){
std::string strName = streamName;
Util::sanitizeName(strName);
IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName);
source = streamCfg.getMember("source").asString();
configLock.post();
}
char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
nProxy.metaPages.clear();
@ -416,9 +426,20 @@ namespace Mist {
// when we don't see this explicitly it makes debugging the recording feature
// a bit painfull :)
if (selectedTracks.size() == 0) {
WARN_MSG("We didn't find any tracks which that we can use. selectedTrack.size() is 0.");
INSANE_MSG("We didn't find any tracks which that we can use. selectedTrack.size() is 0.");
for (std::map<unsigned int,DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){
WARN_MSG("Found track/codec: %s", trit->second.codec.c_str());
INSANE_MSG("Found track/codec: %s", trit->second.codec.c_str());
}
if (!myMeta.tracks.size() && (source.find("dtsc://") == 0)){
//Wait 5 seconds and try again. Keep a counter, try at most 3 times
static int counter = 0;
if (counter++ < 10){
Util::wait(1000);
nProxy.userClient.keepAlive();
stats();
updateMeta();
selectDefaultTracks();
}
}
}
/*end-roxlu*/
@ -898,6 +919,25 @@ namespace Mist {
}
if ( !sentHeader){
DEBUG_MSG(DLVL_DONTEVEN, "sendHeader");
bool waitLonger = false;
if (!myMeta.tracks.size()){
waitLonger = true;
}else{
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!it->second.keys.size()){
waitLonger = true;
break;
}
}
}
if (waitLonger){
updateMeta();
Util::sleep(1000);
static unsigned int metaTries = 0;
if(++metaTries < 7){
continue;
}
}
sendHeader();
}
prepareNext();

View file

@ -108,6 +108,7 @@ namespace Mist {
bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext().
bool completeKeyReadyTimeOut;//a bool to see if there has been a keyframe TimeOut for complete keys in Live
protected://these are to be messed with by child classes
std::string source;
virtual std::string getConnectedHost();
virtual std::string getConnectedBinHost();

View file

@ -29,6 +29,7 @@ namespace Mist {
myConn.SendNow(sSize, 4);
prep.sendTo(myConn);
pushing = false;
fastAsPossibleTime = 0;
}
OutDTSC::~OutDTSC() {}
@ -44,12 +45,52 @@ namespace Mist {
}
void OutDTSC::sendNext(){
if (!realTime && thisPacket.getTime() >= fastAsPossibleTime){
realTime = 1000;
}
if (thisPacket.getFlag("keyframe")){
std::set<unsigned long> availableTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.type == "video" || it->second.type == "audio"){
availableTracks.insert(it->first);
}
}
if (availableTracks != selectedTracks){
//reset, resendheader
JSON::Value prep;
prep["cmd"] = "reset";
/// \todo Make this securererer.
unsigned long sendSize = prep.packedSize();
myConn.SendNow("DTCM");
char sSize[4] = {0, 0, 0, 0};
Bit::htobl(sSize, prep.packedSize());
myConn.SendNow(sSize, 4);
prep.sendTo(myConn);
}
}
myConn.SendNow(thisPacket.getData(), thisPacket.getDataLen());
}
void OutDTSC::sendHeader(){
sentHeader = true;
myMeta.send(myConn, true);
selectedTracks.clear();
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.type == "video" || it->second.type == "audio"){
selectedTracks.insert(it->first);
}
}
myMeta.send(myConn, true, selectedTracks);
if (myMeta.live){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!fastAsPossibleTime || it->second.lastms < fastAsPossibleTime){
fastAsPossibleTime = it->second.lastms;
realTime = 0;
}
}
}else{
fastAsPossibleTime = 50000;//50 seconds
realTime = 0;
}
}
void OutDTSC::onRequest(){
@ -76,6 +117,8 @@ namespace Mist {
streamName = dScan.getMember("stream").asString();
Util::sanitizeName(streamName);
parseData = true;
INFO_MSG("Handled play for stream %s", streamName.c_str());
setBlocking(false);
}
void OutDTSC::handlePush(DTSC::Scan & dScan){

View file

@ -15,6 +15,7 @@ namespace Mist {
bool pushing;
void handlePush(DTSC::Scan & dScan);
void handlePlay(DTSC::Scan & dScan);
unsigned long long fastAsPossibleTime;
};
}

View file

@ -6,6 +6,27 @@ namespace Mist {
///\brief Builds an index file for HTTP Live streaming.
///\return The index file for HTTP Live Streaming.
std::string OutHLS::liveIndex() {
static int timer = 0;
bool checkWait = true;
while (checkWait && ++timer < 10){
checkWait = false;
if (!myMeta.tracks.size()){
checkWait = true;
}
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.keys.size() <= 3){
checkWait = true;
break;
}
}
if (checkWait){
Util::sleep(500);
INFO_MSG("SLeeping timer %d", timer);
updateMeta();
}
}
std::stringstream result;
result << "#EXTM3U\r\n";
int audioId = -1;

View file

@ -43,6 +43,9 @@ namespace Mist {
///\todo This function does not indicate errors anywhere... maybe fix this...
std::string OutProgressiveMP4::DTSCMeta2MP4Header(long long & size, int fragmented) {
if (myMeta.live){
completeKeysOnly = true;
}
//Make sure we have a proper being value for the size...
size = 0;
//Stores the result of the function
@ -745,6 +748,28 @@ namespace Mist {
void OutProgressiveMP4::setvidTrack() {
vidTrack = 0;
static int timer = 0;
bool checkWait = true;
while (checkWait && ++timer < 10){
checkWait = false;
if (!myMeta.tracks.size()){
checkWait = true;
}
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!it->second.keys.size()){
checkWait = true;
break;
}
}
if (checkWait){
Util::sleep(500);
updateMeta();
}
}
if (!selectedTracks.size()){
selectDefaultTracks();
}
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) {
//Find video track
if (myMeta.tracks[*it].type == "video") {