Added skipDynamic optional argument to most binary representations of metadata/tracks, which skips sending dynamic parts of the metadata if true.

This commit is contained in:
Thulinma 2016-02-15 11:48:45 +01:00
parent 10af060ab4
commit 668560ff05
11 changed files with 344 additions and 111 deletions

View file

@ -359,6 +359,7 @@ macro(makeOutput outputName format)
endmacro() endmacro()
makeOutput(RTMP rtmp) makeOutput(RTMP rtmp)
makeOutput(DTSC dtsc)
makeOutput(OGG progressive_ogg http) makeOutput(OGG progressive_ogg http)
makeOutput(FLV progressive_flv http) makeOutput(FLV progressive_flv http)
makeOutput(HTTPMinimalServer http_minimalserver http) makeOutput(HTTPMinimalServer http_minimalserver http)

20
flow_input Normal file
View file

@ -0,0 +1,20 @@
- Construct input
- Parse arguments
- Stream wordt gelocked IFF !nolock
- Start .run()
- setup(): opent files/sockets/etc waar nodig
- set "isStream" naar true
- checkHeaderTimes(): delete .dtsh file als ouder dan input file
- readHeader(): lees header naar interne metadata
- parseHeader(): parse interne metadata
- convert indien geen stream, serve indien stream
serve:
-
stream:
- start buffer
- pull data in
- parse data to stream

View file

@ -9,6 +9,7 @@
char DTSC::Magic_Header[] = "DTSC"; char DTSC::Magic_Header[] = "DTSC";
char DTSC::Magic_Packet[] = "DTPD"; char DTSC::Magic_Packet[] = "DTPD";
char DTSC::Magic_Packet2[] = "DTP2"; char DTSC::Magic_Packet2[] = "DTP2";
char DTSC::Magic_Command[] = "DTCM";
DTSC::File::File() { DTSC::File::File() {
F = 0; F = 0;
@ -32,8 +33,7 @@ DTSC::File & DTSC::File::operator =(const File & rhs) {
if (rhs.myPack) { if (rhs.myPack) {
myPack = rhs.myPack; myPack = rhs.myPack;
} }
metaStorage = rhs.metaStorage; metadata = rhs.metadata;
metadata = metaStorage;
currtime = rhs.currtime; currtime = rhs.currtime;
lastreadpos = rhs.lastreadpos; lastreadpos = rhs.lastreadpos;
headerSize = rhs.headerSize; headerSize = rhs.headerSize;
@ -67,7 +67,7 @@ DTSC::File::File(std::string filename, bool create) {
} }
created = create; created = create;
if (!F) { if (!F) {
DEBUG_MSG(DLVL_ERROR, "Could not open file %s", filename.c_str()); HIGH_MSG("Could not open file %s", filename.c_str());
return; return;
} }
fseek(F, 0, SEEK_END); fseek(F, 0, SEEK_END);
@ -83,7 +83,7 @@ DTSC::File::File(std::string filename, bool create) {
return; return;
} }
if (memcmp(buffer, DTSC::Magic_Header, 4) != 0) { if (memcmp(buffer, DTSC::Magic_Header, 4) != 0) {
if (memcmp(buffer, DTSC::Magic_Packet2, 4) != 0 && memcmp(buffer, DTSC::Magic_Packet, 4) != 0) { if (memcmp(buffer, DTSC::Magic_Packet2, 4) != 0 && memcmp(buffer, DTSC::Magic_Packet, 4) != 0 && memcmp(buffer, DTSC::Magic_Command, 4) != 0) {
DEBUG_MSG(DLVL_ERROR, "%s is not a valid DTSC file", filename.c_str()); DEBUG_MSG(DLVL_ERROR, "%s is not a valid DTSC file", filename.c_str());
fclose(F); fclose(F);
F = 0; F = 0;
@ -113,8 +113,7 @@ DTSC::File::File(std::string filename, bool create) {
fseek(F, 0, SEEK_SET); fseek(F, 0, SEEK_SET);
File Fhead(filename + ".dtsh"); File Fhead(filename + ".dtsh");
if (Fhead) { if (Fhead) {
metaStorage = Fhead.metaStorage; metadata = Fhead.metadata;
metadata = metaStorage;
} }
} }
currframe = 0; currframe = 0;
@ -346,8 +345,9 @@ void DTSC::File::seekNext() {
} }
void DTSC::File::parseNext(){ void DTSC::File::parseNext(){
char header_buffer[4] = {0, 0, 0, 0};
lastreadpos = ftell(F); lastreadpos = ftell(F);
if (fread(buffer, 4, 1, F) != 1) { if (fread(header_buffer, 4, 1, F) != 1) {
if (feof(F)) { if (feof(F)) {
DEBUG_MSG(DLVL_DEVEL, "End of file reached @ %d", (int)lastreadpos); DEBUG_MSG(DLVL_DEVEL, "End of file reached @ %d", (int)lastreadpos);
} else { } else {
@ -356,55 +356,26 @@ void DTSC::File::parseNext(){
myPack.null(); myPack.null();
return; return;
} }
if (memcmp(buffer, DTSC::Magic_Header, 4) == 0) {
if (lastreadpos != 0) {
readHeader(lastreadpos);
std::string tmp = metaStorage.toNetPacked();
myPack.reInit(tmp.data(), tmp.size());
DEBUG_MSG(DLVL_DEVEL, "Read another header");
} else {
if (fread(buffer, 4, 1, F) != 1) {
DEBUG_MSG(DLVL_ERROR, "Could not read header size @ %d", (int)lastreadpos);
myPack.null();
return;
}
long packSize = ntohl(((unsigned long *)buffer)[0]);
std::string strBuffer = "DTSC";
strBuffer.append((char *)buffer, 4);
strBuffer.resize(packSize + 8);
if (fread((void *)(strBuffer.c_str() + 8), packSize, 1, F) != 1) {
DEBUG_MSG(DLVL_ERROR, "Could not read header @ %d", (int)lastreadpos);
myPack.null();
return;
}
myPack.reInit(strBuffer.data(), strBuffer.size());
}
return;
}
long long unsigned int version = 0; long long unsigned int version = 0;
if (memcmp(buffer, DTSC::Magic_Packet, 4) == 0) { if (memcmp(header_buffer, DTSC::Magic_Packet, 4) == 0 || memcmp(header_buffer, DTSC::Magic_Command, 4) == 0 || memcmp(header_buffer, DTSC::Magic_Header, 4) == 0) {
version = 1; version = 1;
} }
if (memcmp(buffer, DTSC::Magic_Packet2, 4) == 0) { if (memcmp(header_buffer, DTSC::Magic_Packet2, 4) == 0) {
version = 2; version = 2;
} }
if (version == 0) { if (version == 0) {
DEBUG_MSG(DLVL_ERROR, "Invalid packet header @ %#x - %.4s != %.4s @ %d", (unsigned int)lastreadpos, (char *)buffer, DTSC::Magic_Packet2, (int)lastreadpos); DEBUG_MSG(DLVL_ERROR, "Invalid packet header @ %#x: %.4s", (unsigned int)lastreadpos, (char *)buffer);
myPack.null(); myPack.null();
return; return;
} }
if (fread(buffer, 4, 1, F) != 1) { if (fread(buffer, 4, 1, F) != 1) {
DEBUG_MSG(DLVL_ERROR, "Could not read packet size @ %d", (int)lastreadpos); DEBUG_MSG(DLVL_ERROR, "Could not read packet size @ %#x", (unsigned int)lastreadpos);
myPack.null(); myPack.null();
return; return;
} }
long packSize = ntohl(((unsigned long *)buffer)[0]); long packSize = ntohl(((unsigned long *)buffer)[0]);
char * packBuffer = (char *)malloc(packSize + 8); char * packBuffer = (char *)malloc(packSize + 8);
if (version == 1) { memcpy(packBuffer, header_buffer, 4);
memcpy(packBuffer, "DTPD", 4);
} else {
memcpy(packBuffer, "DTP2", 4);
}
memcpy(packBuffer + 4, buffer, 4); memcpy(packBuffer + 4, buffer, 4);
if (fread((void *)(packBuffer + 8), packSize, 1, F) != 1) { if (fread((void *)(packBuffer + 8), packSize, 1, F) != 1) {
DEBUG_MSG(DLVL_ERROR, "Could not read packet @ %d", (int)lastreadpos); DEBUG_MSG(DLVL_ERROR, "Could not read packet @ %d", (int)lastreadpos);

View file

@ -34,6 +34,7 @@ namespace DTSC {
extern char Magic_Header[]; ///< The magic bytes for a DTSC header extern char Magic_Header[]; ///< The magic bytes for a DTSC header
extern char Magic_Packet[]; ///< The magic bytes for a DTSC packet extern char Magic_Packet[]; ///< The magic bytes for a DTSC packet
extern char Magic_Packet2[]; ///< The magic bytes for a DTSC packet version 2 extern char Magic_Packet2[]; ///< The magic bytes for a DTSC packet version 2
extern char Magic_Command[]; ///< The magic bytes for a DTCM packet
///\brief A simple structure used for ordering byte seek positions. ///\brief A simple structure used for ordering byte seek positions.
struct seekPos { struct seekPos {
@ -61,7 +62,8 @@ namespace DTSC {
DTSC_INVALID, DTSC_INVALID,
DTSC_HEAD, DTSC_HEAD,
DTSC_V1, DTSC_V1,
DTSC_V2 DTSC_V2,
DTCM
}; };
/// This class allows scanning through raw binary format DTSC data. /// This class allows scanning through raw binary format DTSC data.
@ -295,10 +297,10 @@ namespace DTSC {
void update(long long packTime, long long packOffset, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize, unsigned long segment_size = 5000); void update(long long packTime, long long packOffset, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize, unsigned long segment_size = 5000);
*/ */
void update(long long packTime, long long packOffset, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize, unsigned long segment_size = 5000, const char * iVec = 0); void update(long long packTime, long long packOffset, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize, unsigned long segment_size = 5000, const char * iVec = 0);
int getSendLen(); int getSendLen(bool skipDynamic = false);
void send(Socket::Connection & conn); void send(Socket::Connection & conn, bool skipDynamic = false);
void writeTo(char *& p); void writeTo(char *& p);
JSON::Value toJSON(bool skipBinary = false); JSON::Value toJSON(bool skipDynamic = false);
std::deque<Fragment> fragments; std::deque<Fragment> fragments;
std::deque<Key> keys; std::deque<Key> keys;
std::deque<unsigned long> keySizes; std::deque<unsigned long> keySizes;
@ -352,8 +354,8 @@ namespace DTSC {
void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000); void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000);
LTS*/ LTS*/
void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000, const char * iVec = 0); void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000, const char * iVec = 0);
unsigned int getSendLen(); unsigned int getSendLen(bool skipDynamic = false);
void send(Socket::Connection & conn); void send(Socket::Connection & conn, bool skipDynamic = false);
void writeTo(char * p); void writeTo(char * p);
JSON::Value toJSON(); JSON::Value toJSON();
void reset(); void reset();
@ -398,7 +400,6 @@ namespace DTSC {
long int endPos; long int endPos;
void readHeader(int pos); void readHeader(int pos);
DTSC::Packet myPack; DTSC::Packet myPack;
JSON::Value metaStorage;
Meta metadata; Meta metadata;
std::map<unsigned int, std::string> trackMapping; std::map<unsigned int, std::string> trackMapping;
long long int currtime; long long int currtime;

View file

@ -159,8 +159,12 @@ namespace DTSC {
if (!memcmp(data, Magic_Header, 4)) { if (!memcmp(data, Magic_Header, 4)) {
version = DTSC_HEAD; version = DTSC_HEAD;
} else { } else {
DEBUG_MSG(DLVL_FAIL, "ReInit received a packet with invalid header"); if (!memcmp(data, Magic_Command, 4)) {
return; version = DTCM;
} else {
DEBUG_MSG(DLVL_FAIL, "ReInit received a packet with invalid header");
return;
}
} }
} }
} }
@ -1510,15 +1514,17 @@ namespace DTSC {
} }
///\brief Determines the "packed" size of a track ///\brief Determines the "packed" size of a track
int Track::getSendLen() { int Track::getSendLen(bool skipDynamic) {
int result = 146 + init.size() + codec.size() + type.size() + getWritableIdentifier().size(); int result = 107 + init.size() + codec.size() + type.size() + getWritableIdentifier().size();
result += fragments.size() * PACKED_FRAGMENT_SIZE; if (!skipDynamic){
result += keys.size() * PACKED_KEY_SIZE; result += fragments.size() * PACKED_FRAGMENT_SIZE + 16;
if (keySizes.size()){ result += keys.size() * PACKED_KEY_SIZE + 11;
result += 11 + (keySizes.size() * 4) + 4; if (keySizes.size()){
result += (keySizes.size() * 4) + 15;
}
result += parts.size() * 9 + 12;
result += (ivecs.size() * 8) + 12; /*LTS*/
} }
result += parts.size() * 9;
result += (ivecs.size() * 8) + 12; /*LTS*/
if (type == "audio") { if (type == "audio") {
result += 49; result += 49;
} else if (type == "video") { } else if (type == "video") {
@ -1624,46 +1630,48 @@ namespace DTSC {
} }
///\brief Writes a track to a socket ///\brief Writes a track to a socket
void Track::send(Socket::Connection & conn) { void Track::send(Socket::Connection & conn, bool skipDynamic) {
conn.SendNow(convertShort(getWritableIdentifier().size()), 2); conn.SendNow(convertShort(getWritableIdentifier().size()), 2);
conn.SendNow(getWritableIdentifier()); conn.SendNow(getWritableIdentifier());
conn.SendNow("\340", 1);//Begin track object conn.SendNow("\340", 1);//Begin track object
conn.SendNow("\000\011fragments\002", 12); if (!skipDynamic){
conn.SendNow(convertInt(fragments.size() * PACKED_FRAGMENT_SIZE), 4); conn.SendNow("\000\011fragments\002", 12);
for (std::deque<Fragment>::iterator it = fragments.begin(); it != fragments.end(); it++) { conn.SendNow(convertInt(fragments.size() * PACKED_FRAGMENT_SIZE), 4);
conn.SendNow(it->getData(), PACKED_FRAGMENT_SIZE); for (std::deque<Fragment>::iterator it = fragments.begin(); it != fragments.end(); it++) {
conn.SendNow(it->getData(), PACKED_FRAGMENT_SIZE);
}
conn.SendNow("\000\004keys\002", 7);
conn.SendNow(convertInt(keys.size() * PACKED_KEY_SIZE), 4);
for (std::deque<Key>::iterator it = keys.begin(); it != keys.end(); it++) {
conn.SendNow(it->getData(), PACKED_KEY_SIZE);
}
conn.SendNow("\000\010keysizes\002,", 11);
conn.SendNow(convertInt(keySizes.size() * 4), 4);
std::string tmp;
tmp.reserve(keySizes.size() * 4);
for (unsigned int i = 0; i < keySizes.size(); i++){
tmp += (char)(keySizes[i] >> 24);
tmp += (char)(keySizes[i] >> 16);
tmp += (char)(keySizes[i] >> 8);
tmp += (char)(keySizes[i]);
}
conn.SendNow(tmp.data(), tmp.size());
conn.SendNow("\000\005parts\002", 8);
conn.SendNow(convertInt(parts.size() * 9), 4);
for (std::deque<Part>::iterator it = parts.begin(); it != parts.end(); it++) {
conn.SendNow(it->getData(), 9);
}
/*LTS-START*/
conn.SendNow("\000\005ivecs\002", 8);
conn.SendNow(convertInt(ivecs.size() * 8), 4);
for (std::deque<Ivec>::iterator it = ivecs.begin(); it != ivecs.end(); it++) {
conn.SendNow(it->getData(), 8);
}
/*LTS-END*/
} }
conn.SendNow("\000\004keys\002", 7);
conn.SendNow(convertInt(keys.size() * PACKED_KEY_SIZE), 4);
for (std::deque<Key>::iterator it = keys.begin(); it != keys.end(); it++) {
conn.SendNow(it->getData(), PACKED_KEY_SIZE);
}
conn.SendNow("\000\010keysizes\002,", 11);
conn.SendNow(convertInt(keySizes.size() * 4), 4);
std::string tmp;
tmp.reserve(keySizes.size() * 4);
for (unsigned int i = 0; i < keySizes.size(); i++){
tmp += (char)(keySizes[i] >> 24);
tmp += (char)(keySizes[i] >> 16);
tmp += (char)(keySizes[i] >> 8);
tmp += (char)(keySizes[i]);
}
conn.SendNow(tmp.data(), tmp.size());
conn.SendNow("\000\005parts\002", 8);
conn.SendNow(convertInt(parts.size() * 9), 4);
for (std::deque<Part>::iterator it = parts.begin(); it != parts.end(); it++) {
conn.SendNow(it->getData(), 9);
}
/*LTS-START*/
conn.SendNow("\000\005ivecs\002", 8);
conn.SendNow(convertInt(ivecs.size() * 8), 4);
for (std::deque<Ivec>::iterator it = ivecs.begin(); it != ivecs.end(); it++) {
conn.SendNow(it->getData(), 8);
}
/*LTS-END*/
conn.SendNow("\000\007trackid\001", 10); conn.SendNow("\000\007trackid\001", 10);
conn.SendNow(convertLongLong(trackID), 8); conn.SendNow(convertLongLong(trackID), 8);
if (missedFrags) { if (!skipDynamic && missedFrags) {
conn.SendNow("\000\014missed_frags\001", 15); conn.SendNow("\000\014missed_frags\001", 15);
conn.SendNow(convertLongLong(missedFrags), 8); conn.SendNow(convertLongLong(missedFrags), 8);
} }
@ -1701,10 +1709,10 @@ namespace DTSC {
} }
///\brief Determines the "packed" size of a meta object ///\brief Determines the "packed" size of a meta object
unsigned int Meta::getSendLen() { unsigned int Meta::getSendLen(bool skipDynamic) {
unsigned int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; unsigned int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21;
for (std::map<unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) { for (std::map<unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) {
dataLen += it->second.getSendLen(); dataLen += it->second.getSendLen(skipDynamic);
} }
return dataLen + 8; //add 8 bytes header return dataLen + 8; //add 8 bytes header
} }
@ -1741,13 +1749,13 @@ namespace DTSC {
} }
///\brief Writes a meta object to a socket ///\brief Writes a meta object to a socket
void Meta::send(Socket::Connection & conn) { void Meta::send(Socket::Connection & conn, bool skipDynamic) {
int dataLen = getSendLen() - 8; //strip 8 bytes header int dataLen = getSendLen(skipDynamic) - 8; //strip 8 bytes header
conn.SendNow(DTSC::Magic_Header, 4); conn.SendNow(DTSC::Magic_Header, 4);
conn.SendNow(convertInt(dataLen), 4); conn.SendNow(convertInt(dataLen), 4);
conn.SendNow("\340\000\006tracks\340", 10); conn.SendNow("\340\000\006tracks\340", 10);
for (std::map<unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) { for (std::map<unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) {
it->second.send(conn); it->second.send(conn, skipDynamic);
} }
conn.SendNow("\000\000\356", 3);//End tracks object conn.SendNow("\000\000\356", 3);//End tracks object
if (vod) { if (vod) {
@ -1772,10 +1780,10 @@ namespace DTSC {
} }
///\brief Converts a track to a JSON::Value ///\brief Converts a track to a JSON::Value
JSON::Value Track::toJSON(bool skipBinary) { JSON::Value Track::toJSON(bool skipDynamic) {
JSON::Value result; JSON::Value result;
std::string tmp; std::string tmp;
if (!skipBinary) { if (!skipDynamic) {
tmp.reserve(fragments.size() * PACKED_FRAGMENT_SIZE); tmp.reserve(fragments.size() * PACKED_FRAGMENT_SIZE);
for (std::deque<Fragment>::iterator it = fragments.begin(); it != fragments.end(); it++) { for (std::deque<Fragment>::iterator it = fragments.begin(); it != fragments.end(); it++) {
tmp.append(it->getData(), PACKED_FRAGMENT_SIZE); tmp.append(it->getData(), PACKED_FRAGMENT_SIZE);
@ -1810,8 +1818,8 @@ namespace DTSC {
} }
result["ivecs"] = tmp; result["ivecs"] = tmp;
/*LTS-END*/ /*LTS-END*/
result["init"] = init;
} }
result["init"] = init;
result["trackid"] = trackID; result["trackid"] = trackID;
result["firstms"] = (long long)firstms; result["firstms"] = (long long)firstms;
result["lastms"] = (long long)lastms; result["lastms"] = (long long)lastms;

View file

@ -23,7 +23,9 @@ namespace Analysers {
std::cerr << "Not a valid DTSC file" << std::endl; std::cerr << "Not a valid DTSC file" << std::endl;
return 1; return 1;
} }
F.getMeta().toPrettyString(std::cout,0, 0x03); if (F.getMeta().vod || F.getMeta().live){
F.getMeta().toPrettyString(std::cout,0, 0x03);
}
int bPos = 0; int bPos = 0;
F.seek_bpos(0); F.seek_bpos(0);
@ -42,6 +44,10 @@ namespace Analysers {
std::cout << "DTSC header: " << F.getPacket().getScan().toPrettyString() << std::endl; std::cout << "DTSC header: " << F.getPacket().getScan().toPrettyString() << std::endl;
break; break;
} }
case DTSC::DTCM: {
std::cout << "DTCM command: " << F.getPacket().getScan().toPrettyString() << std::endl;
break;
}
default: default:
DEBUG_MSG(DLVL_WARN,"Invalid dtsc packet @ bpos %d", bPos); DEBUG_MSG(DLVL_WARN,"Invalid dtsc packet @ bpos %d", bPos);
break; break;

View file

@ -240,6 +240,58 @@ namespace Mist {
//end player functionality //end player functionality
} }
/// Main loop for stream-style inputs.
/// This loop will start the buffer without resume support, and then repeatedly call ..... followed by ....
void Input::stream(){
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
/*LTS-START*/
if(Triggers::shouldTrigger("STREAM_READY", config->getString("streamname"))){
std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n";
if (!Triggers::doTrigger("STREAM_READY", payload, config->getString("streamname"))){
config->is_active = false;
}
}
/*LTS-END*/
userPage.init(userPageName, PLAY_EX_SIZE, true);
if (!isBuffer) {
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
bufferFrame(it->first, 1);
}
}
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str());
long long int activityCounter = Util::bootSecs();
while ((Util::bootSecs() - activityCounter) < 10 && config->is_active) { //10 second timeout
userPage.parseEach(callbackWrapper);
removeUnused();
if (userPage.amount) {
activityCounter = Util::bootSecs();
DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount);
} else {
DEBUG_MSG(DLVL_INSANE, "Timer running");
}
/*LTS-START*/
if ((Util::bootSecs() - activityCounter) >= 10 || !config->is_active){//10 second timeout
if(Triggers::shouldTrigger("STREAM_UNLOAD", config->getString("streamname"))){
std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n";
if (!Triggers::doTrigger("STREAM_UNLOAD", payload, config->getString("streamname"))){
activityCounter = Util::bootSecs();
config->is_active = true;
}
}
}
/*LTS-END*/
if (config->is_active){
Util::sleep(1000);
}
}
finish();
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str());
//end player functionality
}
void Input::finish() { void Input::finish() {
for (std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) { for (std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) {
for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) { for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) {

View file

@ -338,10 +338,12 @@ namespace Mist {
} }
if (!found){ if (!found){
for (std::map<unsigned int,DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ for (std::map<unsigned int,DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){
if (trit->second.codec == (*itc).asStringRef()){ if (trit->second.codec == (*itc).asStringRef() || (*itc).asStringRef() == "*"){
genCounter++; genCounter++;
found = true; found = true;
break; if ((*itc).asStringRef() != "*"){
break;
}
} }
} }
} }
@ -368,6 +370,7 @@ namespace Mist {
if ((*itb).size() && myMeta.tracks.size()){ if ((*itb).size() && myMeta.tracks.size()){
bool found = false; bool found = false;
jsonForEach((*itb), itc) { jsonForEach((*itb), itc) {
INFO_MSG("Filling codec: '%s'", (*itc).asStringRef().c_str());
if (found) { if (found) {
break; break;
} }
@ -379,10 +382,12 @@ namespace Mist {
} }
if (!found){ if (!found){
for (std::map<unsigned int,DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ for (std::map<unsigned int,DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){
if (trit->second.codec == (*itc).asStringRef()){ if (trit->second.codec == (*itc).asStringRef() || (*itc).asStringRef() == "*"){
selectedTracks.insert(trit->first); selectedTracks.insert(trit->first);
found = true; found = true;
break; if ((*itc).asStringRef() != "*"){
break;
}
} }
} }
} }

147
src/output/output_dtsc.cpp Normal file
View file

@ -0,0 +1,147 @@
#include "output_dtsc.h"
#include <mist/defines.h>
#include <mist/stream.h>
#include <mist/triggers.h>
#include <mist/auth.h>
#include <mist/bitfields.h>
#include <sys/stat.h>
#include <cstring>
#include <cstdlib>
namespace Mist {
OutDTSC::OutDTSC(Socket::Connection & conn) : Output(conn) {
setBlocking(true);
JSON::Value prep;
prep["cmd"] = "hi";
prep["version"] = "MistServer " PACKAGE_VERSION;
#ifdef BIGMETA
prep["pack_method"] = 2ll;
#else
prep["pack_method"] = 1ll;
#endif
salt = Secure::md5("mehstuff"+JSON::Value((long long)time(0)).asString());
prep["salt"] = salt;
/// \todo Make this securererer.
unsigned long sendSize = prep.packedSize();
myConn.SendNow("DTCM");
char sSize[4] = {0, 0, 0, 0};
Bit::htobl(sSize, prep.packedSize());
myConn.SendNow(sSize, 4);
prep.sendTo(myConn);
pushing = false;
}
OutDTSC::~OutDTSC() {}
void OutDTSC::init(Util::Config * cfg){
Output::init(cfg);
capa["name"] = "DTSC";
capa["desc"] = "Enables the DTSC protocol for efficient inter-server stream exchange.";
capa["deps"] = "";
capa["codecs"][0u][0u].append("*");
cfg->addConnectorOptions(4200, capa);
config = cfg;
}
void OutDTSC::sendNext(){
myConn.SendNow(thisPacket.getData(), thisPacket.getDataLen());
}
void OutDTSC::sendHeader(){
sentHeader = true;
myMeta.send(myConn, true);
}
void OutDTSC::onRequest(){
while (myConn.Received().available(8)){
if (myConn.Received().copy(4) == "DTCM"){
// Command message
std::string toRec = myConn.Received().copy(8);
unsigned long rSize = Bit::btohl(toRec.c_str()+4);
if (!myConn.Received().available(8+rSize)){return;}//abort - not enough data yet
myConn.Received().remove(8);
std::string dataPacket = myConn.Received().remove(rSize);
DTSC::Scan dScan((char*)dataPacket.data(), rSize);
if (dScan.getMember("cmd").asString() == "push"){handlePush(dScan); continue;}
if (dScan.getMember("cmd").asString() == "play"){handlePlay(dScan); continue;}
WARN_MSG("Unhandled DTCM command: '%s'", dScan.getMember("cmd").asString().c_str());
}else{
// Non-command message
//
}
}
}
void OutDTSC::handlePlay(DTSC::Scan & dScan){
streamName = dScan.getMember("stream").asString();
Util::sanitizeName(streamName);
parseData = true;
}
void OutDTSC::handlePush(DTSC::Scan & dScan){
streamName = dScan.getMember("stream").asString();
std::string passString = dScan.getMember("password").asString();
Util::sanitizeName(streamName);
//pull the server configuration
std::string smp = streamName.substr(0,(streamName.find_first_of("+ ")));
IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE); ///< Contains server configuration and capabilities
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(smp);
if (streamCfg){
if (streamCfg.getMember("source").asString().substr(0, 7) != "push://"){
DEBUG_MSG(DLVL_FAIL, "Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str());
myConn.close();
}else{
std::string source = streamCfg.getMember("source").asString().substr(7);
std::string IP = source.substr(0, source.find('@'));
/*LTS-START*/
std::string password;
if (source.find('@') != std::string::npos){
password = source.substr(source.find('@')+1);
if (password != ""){
if (passString == Secure::md5(salt + password)){
DEBUG_MSG(DLVL_DEVEL, "Password accepted - ignoring IP settings.");
IP = "";
}else{
DEBUG_MSG(DLVL_DEVEL, "Password rejected - checking IP.");
if (IP == ""){
IP = "deny-all.invalid";
}
}
}
}
if(Triggers::shouldTrigger("STREAM_PUSH", smp)){
std::string payload = streamName+"\n" + myConn.getHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl;
if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){
DEBUG_MSG(DLVL_FAIL, "Push from %s to %s rejected - STREAM_PUSH trigger denied the push", myConn.getHost().c_str(), streamName.c_str());
myConn.close();
configLock.post();
configLock.close();
return;
}
}
/*LTS-END*/
if (IP != ""){
if (!myConn.isAddress(IP)){
DEBUG_MSG(DLVL_FAIL, "Push from %s to %s rejected - source host not whitelisted", myConn.getHost().c_str(), streamName.c_str());
myConn.close();
}
}
}
}else{
DEBUG_MSG(DLVL_FAIL, "Push from %s rejected - stream '%s' not configured.", myConn.getHost().c_str(), streamName.c_str());
myConn.close();
}
configLock.post();
configLock.close();
if (!myConn){return;}//do not initialize if rejected
initialize();
pushing = true;
}
}

22
src/output/output_dtsc.h Normal file
View file

@ -0,0 +1,22 @@
#include "output.h"
namespace Mist {
class OutDTSC : public Output {
public:
OutDTSC(Socket::Connection & conn);
~OutDTSC();
static void init(Util::Config * cfg);
void onRequest();
void sendNext();
void sendHeader();
private:
std::string salt;
bool pushing;
void handlePush(DTSC::Scan & dScan);
void handlePlay(DTSC::Scan & dScan);
};
}
typedef Mist::OutDTSC mistOut;

View file

@ -4,11 +4,11 @@ namespace Mist {
OutRaw::OutRaw(Socket::Connection & conn) : Output(conn) { OutRaw::OutRaw(Socket::Connection & conn) : Output(conn) {
streamName = config->getString("streamname"); streamName = config->getString("streamname");
initialize(); initialize();
selectedTracks.clear();
std::string tracks = config->getString("tracks"); std::string tracks = config->getString("tracks");
unsigned int currTrack = 0; if (tracks.size()){
//loop over tracks, add any found track IDs to selectedTracks selectedTracks.clear();
if (tracks != ""){ unsigned int currTrack = 0;
//loop over tracks, add any found track IDs to selectedTracks
for (unsigned int i = 0; i < tracks.size(); ++i){ for (unsigned int i = 0; i < tracks.size(); ++i){
if (tracks[i] >= '0' && tracks[i] <= '9'){ if (tracks[i] >= '0' && tracks[i] <= '9'){
currTrack = currTrack*10 + (tracks[i] - '0'); currTrack = currTrack*10 + (tracks[i] - '0');
@ -46,8 +46,7 @@ namespace Mist {
capa["optional"]["seek"]["help"] = "The time in milliseconds to seek to, 0 by default."; capa["optional"]["seek"]["help"] = "The time in milliseconds to seek to, 0 by default.";
capa["optional"]["seek"]["type"] = "int"; capa["optional"]["seek"]["type"] = "int";
capa["optional"]["seek"]["option"] = "--seek"; capa["optional"]["seek"]["option"] = "--seek";
capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("*");
capa["codecs"][0u][1u].append("AAC");
cfg->addOption("streamname", cfg->addOption("streamname",
JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}")); JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}"));
cfg->addOption("tracks", cfg->addOption("tracks",
@ -68,3 +67,4 @@ namespace Mist {
} }
} }