From 668560ff05933cc5f1654fbd5b2b31ce4a518310 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 15 Feb 2016 11:48:45 +0100 Subject: [PATCH] Added skipDynamic optional argument to most binary representations of metadata/tracks, which skips sending dynamic parts of the metadata if true. --- CMakeLists.txt | 1 + flow_input | 20 +++++ lib/dtsc.cpp | 53 +++--------- lib/dtsc.h | 15 ++-- lib/dtscmeta.cpp | 112 +++++++++++++----------- src/analysers/dtsc_analyser.cpp | 8 +- src/input/input.cpp | 52 +++++++++++ src/output/output.cpp | 13 ++- src/output/output_dtsc.cpp | 147 ++++++++++++++++++++++++++++++++ src/output/output_dtsc.h | 22 +++++ src/output/output_raw.cpp | 12 +-- 11 files changed, 344 insertions(+), 111 deletions(-) create mode 100644 flow_input create mode 100644 src/output/output_dtsc.cpp create mode 100644 src/output/output_dtsc.h diff --git a/CMakeLists.txt b/CMakeLists.txt index e10467cc..ca826dfd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -359,6 +359,7 @@ macro(makeOutput outputName format) endmacro() makeOutput(RTMP rtmp) +makeOutput(DTSC dtsc) makeOutput(OGG progressive_ogg http) makeOutput(FLV progressive_flv http) makeOutput(HTTPMinimalServer http_minimalserver http) diff --git a/flow_input b/flow_input new file mode 100644 index 00000000..3aa9e66f --- /dev/null +++ b/flow_input @@ -0,0 +1,20 @@ + +- Construct input +- Parse arguments +- Stream wordt gelocked IFF !nolock +- Start .run() +- setup(): opent files/sockets/etc waar nodig + - set "isStream" naar true +- checkHeaderTimes(): delete .dtsh file als ouder dan input file +- readHeader(): lees header naar interne metadata +- parseHeader(): parse interne metadata +- convert indien geen stream, serve indien stream + +serve: +- + +stream: +- start buffer +- pull data in +- parse data to stream + diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 2b4d0a11..ad02c164 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -9,6 +9,7 @@ char DTSC::Magic_Header[] = "DTSC"; char DTSC::Magic_Packet[] = "DTPD"; char DTSC::Magic_Packet2[] = "DTP2"; +char DTSC::Magic_Command[] = "DTCM"; DTSC::File::File() { F = 0; @@ -32,8 +33,7 @@ DTSC::File & DTSC::File::operator =(const File & rhs) { if (rhs.myPack) { myPack = rhs.myPack; } - metaStorage = rhs.metaStorage; - metadata = metaStorage; + metadata = rhs.metadata; currtime = rhs.currtime; lastreadpos = rhs.lastreadpos; headerSize = rhs.headerSize; @@ -67,7 +67,7 @@ DTSC::File::File(std::string filename, bool create) { } created = create; if (!F) { - DEBUG_MSG(DLVL_ERROR, "Could not open file %s", filename.c_str()); + HIGH_MSG("Could not open file %s", filename.c_str()); return; } fseek(F, 0, SEEK_END); @@ -83,7 +83,7 @@ DTSC::File::File(std::string filename, bool create) { return; } if (memcmp(buffer, DTSC::Magic_Header, 4) != 0) { - if (memcmp(buffer, DTSC::Magic_Packet2, 4) != 0 && memcmp(buffer, DTSC::Magic_Packet, 4) != 0) { + if (memcmp(buffer, DTSC::Magic_Packet2, 4) != 0 && memcmp(buffer, DTSC::Magic_Packet, 4) != 0 && memcmp(buffer, DTSC::Magic_Command, 4) != 0) { DEBUG_MSG(DLVL_ERROR, "%s is not a valid DTSC file", filename.c_str()); fclose(F); F = 0; @@ -113,8 +113,7 @@ DTSC::File::File(std::string filename, bool create) { fseek(F, 0, SEEK_SET); File Fhead(filename + ".dtsh"); if (Fhead) { - metaStorage = Fhead.metaStorage; - metadata = metaStorage; + metadata = Fhead.metadata; } } currframe = 0; @@ -346,8 +345,9 @@ void DTSC::File::seekNext() { } void DTSC::File::parseNext(){ + char header_buffer[4] = {0, 0, 0, 0}; lastreadpos = ftell(F); - if (fread(buffer, 4, 1, F) != 1) { + if (fread(header_buffer, 4, 1, F) != 1) { if (feof(F)) { DEBUG_MSG(DLVL_DEVEL, "End of file reached @ %d", (int)lastreadpos); } else { @@ -356,55 +356,26 @@ void DTSC::File::parseNext(){ myPack.null(); return; } - if (memcmp(buffer, DTSC::Magic_Header, 4) == 0) { - if (lastreadpos != 0) { - readHeader(lastreadpos); - std::string tmp = metaStorage.toNetPacked(); - myPack.reInit(tmp.data(), tmp.size()); - DEBUG_MSG(DLVL_DEVEL, "Read another header"); - } else { - if (fread(buffer, 4, 1, F) != 1) { - DEBUG_MSG(DLVL_ERROR, "Could not read header size @ %d", (int)lastreadpos); - myPack.null(); - return; - } - long packSize = ntohl(((unsigned long *)buffer)[0]); - std::string strBuffer = "DTSC"; - strBuffer.append((char *)buffer, 4); - strBuffer.resize(packSize + 8); - if (fread((void *)(strBuffer.c_str() + 8), packSize, 1, F) != 1) { - DEBUG_MSG(DLVL_ERROR, "Could not read header @ %d", (int)lastreadpos); - myPack.null(); - return; - } - myPack.reInit(strBuffer.data(), strBuffer.size()); - } - return; - } long long unsigned int version = 0; - if (memcmp(buffer, DTSC::Magic_Packet, 4) == 0) { + if (memcmp(header_buffer, DTSC::Magic_Packet, 4) == 0 || memcmp(header_buffer, DTSC::Magic_Command, 4) == 0 || memcmp(header_buffer, DTSC::Magic_Header, 4) == 0) { version = 1; } - if (memcmp(buffer, DTSC::Magic_Packet2, 4) == 0) { + if (memcmp(header_buffer, DTSC::Magic_Packet2, 4) == 0) { version = 2; } if (version == 0) { - DEBUG_MSG(DLVL_ERROR, "Invalid packet header @ %#x - %.4s != %.4s @ %d", (unsigned int)lastreadpos, (char *)buffer, DTSC::Magic_Packet2, (int)lastreadpos); + DEBUG_MSG(DLVL_ERROR, "Invalid packet header @ %#x: %.4s", (unsigned int)lastreadpos, (char *)buffer); myPack.null(); return; } if (fread(buffer, 4, 1, F) != 1) { - DEBUG_MSG(DLVL_ERROR, "Could not read packet size @ %d", (int)lastreadpos); + DEBUG_MSG(DLVL_ERROR, "Could not read packet size @ %#x", (unsigned int)lastreadpos); myPack.null(); return; } long packSize = ntohl(((unsigned long *)buffer)[0]); char * packBuffer = (char *)malloc(packSize + 8); - if (version == 1) { - memcpy(packBuffer, "DTPD", 4); - } else { - memcpy(packBuffer, "DTP2", 4); - } + memcpy(packBuffer, header_buffer, 4); memcpy(packBuffer + 4, buffer, 4); if (fread((void *)(packBuffer + 8), packSize, 1, F) != 1) { DEBUG_MSG(DLVL_ERROR, "Could not read packet @ %d", (int)lastreadpos); diff --git a/lib/dtsc.h b/lib/dtsc.h index 0a1a39e2..fcf0152a 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -34,6 +34,7 @@ namespace DTSC { extern char Magic_Header[]; ///< The magic bytes for a DTSC header extern char Magic_Packet[]; ///< The magic bytes for a DTSC packet extern char Magic_Packet2[]; ///< The magic bytes for a DTSC packet version 2 + extern char Magic_Command[]; ///< The magic bytes for a DTCM packet ///\brief A simple structure used for ordering byte seek positions. struct seekPos { @@ -61,7 +62,8 @@ namespace DTSC { DTSC_INVALID, DTSC_HEAD, DTSC_V1, - DTSC_V2 + DTSC_V2, + DTCM }; /// This class allows scanning through raw binary format DTSC data. @@ -295,10 +297,10 @@ namespace DTSC { void update(long long packTime, long long packOffset, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize, unsigned long segment_size = 5000); */ void update(long long packTime, long long packOffset, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize, unsigned long segment_size = 5000, const char * iVec = 0); - int getSendLen(); - void send(Socket::Connection & conn); + int getSendLen(bool skipDynamic = false); + void send(Socket::Connection & conn, bool skipDynamic = false); void writeTo(char *& p); - JSON::Value toJSON(bool skipBinary = false); + JSON::Value toJSON(bool skipDynamic = false); std::deque fragments; std::deque keys; std::deque keySizes; @@ -352,8 +354,8 @@ namespace DTSC { void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000); LTS*/ void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000, const char * iVec = 0); - unsigned int getSendLen(); - void send(Socket::Connection & conn); + unsigned int getSendLen(bool skipDynamic = false); + void send(Socket::Connection & conn, bool skipDynamic = false); void writeTo(char * p); JSON::Value toJSON(); void reset(); @@ -398,7 +400,6 @@ namespace DTSC { long int endPos; void readHeader(int pos); DTSC::Packet myPack; - JSON::Value metaStorage; Meta metadata; std::map trackMapping; long long int currtime; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index 2b6cc43c..c45ce903 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -159,8 +159,12 @@ namespace DTSC { if (!memcmp(data, Magic_Header, 4)) { version = DTSC_HEAD; } else { - DEBUG_MSG(DLVL_FAIL, "ReInit received a packet with invalid header"); - return; + if (!memcmp(data, Magic_Command, 4)) { + version = DTCM; + } else { + DEBUG_MSG(DLVL_FAIL, "ReInit received a packet with invalid header"); + return; + } } } } @@ -1510,15 +1514,17 @@ namespace DTSC { } ///\brief Determines the "packed" size of a track - int Track::getSendLen() { - int result = 146 + init.size() + codec.size() + type.size() + getWritableIdentifier().size(); - result += fragments.size() * PACKED_FRAGMENT_SIZE; - result += keys.size() * PACKED_KEY_SIZE; - if (keySizes.size()){ - result += 11 + (keySizes.size() * 4) + 4; + int Track::getSendLen(bool skipDynamic) { + int result = 107 + init.size() + codec.size() + type.size() + getWritableIdentifier().size(); + if (!skipDynamic){ + result += fragments.size() * PACKED_FRAGMENT_SIZE + 16; + result += keys.size() * PACKED_KEY_SIZE + 11; + if (keySizes.size()){ + result += (keySizes.size() * 4) + 15; + } + result += parts.size() * 9 + 12; + result += (ivecs.size() * 8) + 12; /*LTS*/ } - result += parts.size() * 9; - result += (ivecs.size() * 8) + 12; /*LTS*/ if (type == "audio") { result += 49; } else if (type == "video") { @@ -1624,46 +1630,48 @@ namespace DTSC { } ///\brief Writes a track to a socket - void Track::send(Socket::Connection & conn) { + void Track::send(Socket::Connection & conn, bool skipDynamic) { conn.SendNow(convertShort(getWritableIdentifier().size()), 2); conn.SendNow(getWritableIdentifier()); conn.SendNow("\340", 1);//Begin track object - conn.SendNow("\000\011fragments\002", 12); - conn.SendNow(convertInt(fragments.size() * PACKED_FRAGMENT_SIZE), 4); - for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { - conn.SendNow(it->getData(), PACKED_FRAGMENT_SIZE); + if (!skipDynamic){ + conn.SendNow("\000\011fragments\002", 12); + conn.SendNow(convertInt(fragments.size() * PACKED_FRAGMENT_SIZE), 4); + for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { + conn.SendNow(it->getData(), PACKED_FRAGMENT_SIZE); + } + conn.SendNow("\000\004keys\002", 7); + conn.SendNow(convertInt(keys.size() * PACKED_KEY_SIZE), 4); + for (std::deque::iterator it = keys.begin(); it != keys.end(); it++) { + conn.SendNow(it->getData(), PACKED_KEY_SIZE); + } + conn.SendNow("\000\010keysizes\002,", 11); + conn.SendNow(convertInt(keySizes.size() * 4), 4); + std::string tmp; + tmp.reserve(keySizes.size() * 4); + for (unsigned int i = 0; i < keySizes.size(); i++){ + tmp += (char)(keySizes[i] >> 24); + tmp += (char)(keySizes[i] >> 16); + tmp += (char)(keySizes[i] >> 8); + tmp += (char)(keySizes[i]); + } + conn.SendNow(tmp.data(), tmp.size()); + conn.SendNow("\000\005parts\002", 8); + conn.SendNow(convertInt(parts.size() * 9), 4); + for (std::deque::iterator it = parts.begin(); it != parts.end(); it++) { + conn.SendNow(it->getData(), 9); + } + /*LTS-START*/ + conn.SendNow("\000\005ivecs\002", 8); + conn.SendNow(convertInt(ivecs.size() * 8), 4); + for (std::deque::iterator it = ivecs.begin(); it != ivecs.end(); it++) { + conn.SendNow(it->getData(), 8); + } + /*LTS-END*/ } - conn.SendNow("\000\004keys\002", 7); - conn.SendNow(convertInt(keys.size() * PACKED_KEY_SIZE), 4); - for (std::deque::iterator it = keys.begin(); it != keys.end(); it++) { - conn.SendNow(it->getData(), PACKED_KEY_SIZE); - } - conn.SendNow("\000\010keysizes\002,", 11); - conn.SendNow(convertInt(keySizes.size() * 4), 4); - std::string tmp; - tmp.reserve(keySizes.size() * 4); - for (unsigned int i = 0; i < keySizes.size(); i++){ - tmp += (char)(keySizes[i] >> 24); - tmp += (char)(keySizes[i] >> 16); - tmp += (char)(keySizes[i] >> 8); - tmp += (char)(keySizes[i]); - } - conn.SendNow(tmp.data(), tmp.size()); - conn.SendNow("\000\005parts\002", 8); - conn.SendNow(convertInt(parts.size() * 9), 4); - for (std::deque::iterator it = parts.begin(); it != parts.end(); it++) { - conn.SendNow(it->getData(), 9); - } - /*LTS-START*/ - conn.SendNow("\000\005ivecs\002", 8); - conn.SendNow(convertInt(ivecs.size() * 8), 4); - for (std::deque::iterator it = ivecs.begin(); it != ivecs.end(); it++) { - conn.SendNow(it->getData(), 8); - } - /*LTS-END*/ conn.SendNow("\000\007trackid\001", 10); conn.SendNow(convertLongLong(trackID), 8); - if (missedFrags) { + if (!skipDynamic && missedFrags) { conn.SendNow("\000\014missed_frags\001", 15); conn.SendNow(convertLongLong(missedFrags), 8); } @@ -1701,10 +1709,10 @@ namespace DTSC { } ///\brief Determines the "packed" size of a meta object - unsigned int Meta::getSendLen() { + unsigned int Meta::getSendLen(bool skipDynamic) { unsigned int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { - dataLen += it->second.getSendLen(); + dataLen += it->second.getSendLen(skipDynamic); } return dataLen + 8; //add 8 bytes header } @@ -1741,13 +1749,13 @@ namespace DTSC { } ///\brief Writes a meta object to a socket - void Meta::send(Socket::Connection & conn) { - int dataLen = getSendLen() - 8; //strip 8 bytes header + void Meta::send(Socket::Connection & conn, bool skipDynamic) { + int dataLen = getSendLen(skipDynamic) - 8; //strip 8 bytes header conn.SendNow(DTSC::Magic_Header, 4); conn.SendNow(convertInt(dataLen), 4); conn.SendNow("\340\000\006tracks\340", 10); for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { - it->second.send(conn); + it->second.send(conn, skipDynamic); } conn.SendNow("\000\000\356", 3);//End tracks object if (vod) { @@ -1772,10 +1780,10 @@ namespace DTSC { } ///\brief Converts a track to a JSON::Value - JSON::Value Track::toJSON(bool skipBinary) { + JSON::Value Track::toJSON(bool skipDynamic) { JSON::Value result; std::string tmp; - if (!skipBinary) { + if (!skipDynamic) { tmp.reserve(fragments.size() * PACKED_FRAGMENT_SIZE); for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { tmp.append(it->getData(), PACKED_FRAGMENT_SIZE); @@ -1810,8 +1818,8 @@ namespace DTSC { } result["ivecs"] = tmp; /*LTS-END*/ - result["init"] = init; } + result["init"] = init; result["trackid"] = trackID; result["firstms"] = (long long)firstms; result["lastms"] = (long long)lastms; diff --git a/src/analysers/dtsc_analyser.cpp b/src/analysers/dtsc_analyser.cpp index e9462b78..fe886c83 100644 --- a/src/analysers/dtsc_analyser.cpp +++ b/src/analysers/dtsc_analyser.cpp @@ -23,7 +23,9 @@ namespace Analysers { std::cerr << "Not a valid DTSC file" << std::endl; return 1; } - F.getMeta().toPrettyString(std::cout,0, 0x03); + if (F.getMeta().vod || F.getMeta().live){ + F.getMeta().toPrettyString(std::cout,0, 0x03); + } int bPos = 0; F.seek_bpos(0); @@ -42,6 +44,10 @@ namespace Analysers { std::cout << "DTSC header: " << F.getPacket().getScan().toPrettyString() << std::endl; break; } + case DTSC::DTCM: { + std::cout << "DTCM command: " << F.getPacket().getScan().toPrettyString() << std::endl; + break; + } default: DEBUG_MSG(DLVL_WARN,"Invalid dtsc packet @ bpos %d", bPos); break; diff --git a/src/input/input.cpp b/src/input/input.cpp index c9878614..7536c185 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -240,6 +240,58 @@ namespace Mist { //end player functionality } + /// Main loop for stream-style inputs. + /// This loop will start the buffer without resume support, and then repeatedly call ..... followed by .... + void Input::stream(){ + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); + /*LTS-START*/ + if(Triggers::shouldTrigger("STREAM_READY", config->getString("streamname"))){ + std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n"; + if (!Triggers::doTrigger("STREAM_READY", payload, config->getString("streamname"))){ + config->is_active = false; + } + } + /*LTS-END*/ + userPage.init(userPageName, PLAY_EX_SIZE, true); + if (!isBuffer) { + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { + bufferFrame(it->first, 1); + } + } + + DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); + + long long int activityCounter = Util::bootSecs(); + while ((Util::bootSecs() - activityCounter) < 10 && config->is_active) { //10 second timeout + userPage.parseEach(callbackWrapper); + removeUnused(); + if (userPage.amount) { + activityCounter = Util::bootSecs(); + DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount); + } else { + DEBUG_MSG(DLVL_INSANE, "Timer running"); + } + /*LTS-START*/ + if ((Util::bootSecs() - activityCounter) >= 10 || !config->is_active){//10 second timeout + if(Triggers::shouldTrigger("STREAM_UNLOAD", config->getString("streamname"))){ + std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n"; + if (!Triggers::doTrigger("STREAM_UNLOAD", payload, config->getString("streamname"))){ + activityCounter = Util::bootSecs(); + config->is_active = true; + } + } + } + /*LTS-END*/ + if (config->is_active){ + Util::sleep(1000); + } + } + finish(); + DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str()); + //end player functionality + } + void Input::finish() { for (std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) { for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) { diff --git a/src/output/output.cpp b/src/output/output.cpp index 855d570c..35d562bf 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -338,10 +338,12 @@ namespace Mist { } if (!found){ for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ - if (trit->second.codec == (*itc).asStringRef()){ + if (trit->second.codec == (*itc).asStringRef() || (*itc).asStringRef() == "*"){ genCounter++; found = true; - break; + if ((*itc).asStringRef() != "*"){ + break; + } } } } @@ -368,6 +370,7 @@ namespace Mist { if ((*itb).size() && myMeta.tracks.size()){ bool found = false; jsonForEach((*itb), itc) { + INFO_MSG("Filling codec: '%s'", (*itc).asStringRef().c_str()); if (found) { break; } @@ -379,10 +382,12 @@ namespace Mist { } if (!found){ for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ - if (trit->second.codec == (*itc).asStringRef()){ + if (trit->second.codec == (*itc).asStringRef() || (*itc).asStringRef() == "*"){ selectedTracks.insert(trit->first); found = true; - break; + if ((*itc).asStringRef() != "*"){ + break; + } } } } diff --git a/src/output/output_dtsc.cpp b/src/output/output_dtsc.cpp new file mode 100644 index 00000000..f75b032d --- /dev/null +++ b/src/output/output_dtsc.cpp @@ -0,0 +1,147 @@ +#include "output_dtsc.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Mist { + OutDTSC::OutDTSC(Socket::Connection & conn) : Output(conn) { + setBlocking(true); + JSON::Value prep; + prep["cmd"] = "hi"; + prep["version"] = "MistServer " PACKAGE_VERSION; +#ifdef BIGMETA + prep["pack_method"] = 2ll; +#else + prep["pack_method"] = 1ll; +#endif + salt = Secure::md5("mehstuff"+JSON::Value((long long)time(0)).asString()); + prep["salt"] = salt; + /// \todo Make this securererer. + unsigned long sendSize = prep.packedSize(); + myConn.SendNow("DTCM"); + char sSize[4] = {0, 0, 0, 0}; + Bit::htobl(sSize, prep.packedSize()); + myConn.SendNow(sSize, 4); + prep.sendTo(myConn); + pushing = false; + } + + OutDTSC::~OutDTSC() {} + + void OutDTSC::init(Util::Config * cfg){ + Output::init(cfg); + capa["name"] = "DTSC"; + capa["desc"] = "Enables the DTSC protocol for efficient inter-server stream exchange."; + capa["deps"] = ""; + capa["codecs"][0u][0u].append("*"); + cfg->addConnectorOptions(4200, capa); + config = cfg; + } + + void OutDTSC::sendNext(){ + myConn.SendNow(thisPacket.getData(), thisPacket.getDataLen()); + } + + void OutDTSC::sendHeader(){ + sentHeader = true; + myMeta.send(myConn, true); + } + + void OutDTSC::onRequest(){ + while (myConn.Received().available(8)){ + if (myConn.Received().copy(4) == "DTCM"){ + // Command message + std::string toRec = myConn.Received().copy(8); + unsigned long rSize = Bit::btohl(toRec.c_str()+4); + if (!myConn.Received().available(8+rSize)){return;}//abort - not enough data yet + myConn.Received().remove(8); + std::string dataPacket = myConn.Received().remove(rSize); + DTSC::Scan dScan((char*)dataPacket.data(), rSize); + if (dScan.getMember("cmd").asString() == "push"){handlePush(dScan); continue;} + if (dScan.getMember("cmd").asString() == "play"){handlePlay(dScan); continue;} + WARN_MSG("Unhandled DTCM command: '%s'", dScan.getMember("cmd").asString().c_str()); + }else{ + // Non-command message + // + } + } + } + + void OutDTSC::handlePlay(DTSC::Scan & dScan){ + streamName = dScan.getMember("stream").asString(); + Util::sanitizeName(streamName); + parseData = true; + } + + void OutDTSC::handlePush(DTSC::Scan & dScan){ + streamName = dScan.getMember("stream").asString(); + std::string passString = dScan.getMember("password").asString(); + + Util::sanitizeName(streamName); + //pull the server configuration + std::string smp = streamName.substr(0,(streamName.find_first_of("+ "))); + IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE); ///< Contains server configuration and capabilities + IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + configLock.wait(); + + DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(smp); + if (streamCfg){ + if (streamCfg.getMember("source").asString().substr(0, 7) != "push://"){ + DEBUG_MSG(DLVL_FAIL, "Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str()); + myConn.close(); + }else{ + std::string source = streamCfg.getMember("source").asString().substr(7); + std::string IP = source.substr(0, source.find('@')); + /*LTS-START*/ + std::string password; + if (source.find('@') != std::string::npos){ + password = source.substr(source.find('@')+1); + if (password != ""){ + if (passString == Secure::md5(salt + password)){ + DEBUG_MSG(DLVL_DEVEL, "Password accepted - ignoring IP settings."); + IP = ""; + }else{ + DEBUG_MSG(DLVL_DEVEL, "Password rejected - checking IP."); + if (IP == ""){ + IP = "deny-all.invalid"; + } + } + } + } + if(Triggers::shouldTrigger("STREAM_PUSH", smp)){ + std::string payload = streamName+"\n" + myConn.getHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl; + if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){ + DEBUG_MSG(DLVL_FAIL, "Push from %s to %s rejected - STREAM_PUSH trigger denied the push", myConn.getHost().c_str(), streamName.c_str()); + myConn.close(); + configLock.post(); + configLock.close(); + return; + } + } + /*LTS-END*/ + if (IP != ""){ + if (!myConn.isAddress(IP)){ + DEBUG_MSG(DLVL_FAIL, "Push from %s to %s rejected - source host not whitelisted", myConn.getHost().c_str(), streamName.c_str()); + myConn.close(); + } + } + } + }else{ + DEBUG_MSG(DLVL_FAIL, "Push from %s rejected - stream '%s' not configured.", myConn.getHost().c_str(), streamName.c_str()); + myConn.close(); + } + configLock.post(); + configLock.close(); + if (!myConn){return;}//do not initialize if rejected + initialize(); + pushing = true; + } + + +} + diff --git a/src/output/output_dtsc.h b/src/output/output_dtsc.h new file mode 100644 index 00000000..ba01630b --- /dev/null +++ b/src/output/output_dtsc.h @@ -0,0 +1,22 @@ +#include "output.h" + +namespace Mist { + + class OutDTSC : public Output { + public: + OutDTSC(Socket::Connection & conn); + ~OutDTSC(); + static void init(Util::Config * cfg); + void onRequest(); + void sendNext(); + void sendHeader(); + private: + std::string salt; + bool pushing; + void handlePush(DTSC::Scan & dScan); + void handlePlay(DTSC::Scan & dScan); + }; +} + +typedef Mist::OutDTSC mistOut; + diff --git a/src/output/output_raw.cpp b/src/output/output_raw.cpp index 366b0a4b..af9fc029 100644 --- a/src/output/output_raw.cpp +++ b/src/output/output_raw.cpp @@ -4,11 +4,11 @@ namespace Mist { OutRaw::OutRaw(Socket::Connection & conn) : Output(conn) { streamName = config->getString("streamname"); initialize(); - selectedTracks.clear(); std::string tracks = config->getString("tracks"); - unsigned int currTrack = 0; - //loop over tracks, add any found track IDs to selectedTracks - if (tracks != ""){ + if (tracks.size()){ + selectedTracks.clear(); + unsigned int currTrack = 0; + //loop over tracks, add any found track IDs to selectedTracks for (unsigned int i = 0; i < tracks.size(); ++i){ if (tracks[i] >= '0' && tracks[i] <= '9'){ currTrack = currTrack*10 + (tracks[i] - '0'); @@ -46,8 +46,7 @@ namespace Mist { capa["optional"]["seek"]["help"] = "The time in milliseconds to seek to, 0 by default."; capa["optional"]["seek"]["type"] = "int"; capa["optional"]["seek"]["option"] = "--seek"; - capa["codecs"][0u][0u].append("H264"); - capa["codecs"][0u][1u].append("AAC"); + capa["codecs"][0u][0u].append("*"); cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}")); cfg->addOption("tracks", @@ -68,3 +67,4 @@ namespace Mist { } } +