Finish splitting controller into multiple files, universalised coding style across all files.

This commit is contained in:
Thulinma 2012-12-12 20:16:53 +01:00
parent 0db5f60b95
commit 0920b3259b
30 changed files with 1616 additions and 1246 deletions

View file

@ -11,7 +11,7 @@ SUBDIRS=converters analysers
bin_PROGRAMS=MistBuffer MistController MistConnRAW MistConnRTMP MistConnHTTP MistConnHTTPProgressive MistConnHTTPDynamic MistConnHTTPSmooth MistPlayer bin_PROGRAMS=MistBuffer MistController MistConnRAW MistConnRTMP MistConnHTTP MistConnHTTPProgressive MistConnHTTPDynamic MistConnHTTPSmooth MistPlayer
MistBuffer_SOURCES=buffer.cpp buffer_user.h buffer_user.cpp buffer_stream.h buffer_stream.cpp tinythread.cpp tinythread.h ../VERSION MistBuffer_SOURCES=buffer.cpp buffer_user.h buffer_user.cpp buffer_stream.h buffer_stream.cpp tinythread.cpp tinythread.h ../VERSION
MistBuffer_LDADD=$(MIST_LIBS) -lpthread MistBuffer_LDADD=$(MIST_LIBS) -lpthread
MistController_SOURCES=controller.cpp controller_connectors.h controller_connectors.cpp controller_storage.h controller_storage.cpp ../VERSION ./server.html.h MistController_SOURCES=controller.cpp controller_connectors.h controller_connectors.cpp controller_storage.h controller_storage.cpp controller_streams.h controller_streams.cpp controller_capabilities.h controller_capabilities.cpp ../VERSION ./server.html.h
MistConnRAW_SOURCES=conn_raw.cpp ../VERSION MistConnRAW_SOURCES=conn_raw.cpp ../VERSION
MistConnRTMP_SOURCES=conn_rtmp.cpp ../VERSION MistConnRTMP_SOURCES=conn_rtmp.cpp ../VERSION
MistConnHTTP_SOURCES=conn_http.cpp tinythread.cpp tinythread.h ../VERSION ./embed.js.h MistConnHTTP_SOURCES=conn_http.cpp tinythread.cpp tinythread.h ../VERSION ./embed.js.h

View file

@ -15,7 +15,9 @@ int main(int argc, char ** argv) {
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
std::string temp; std::string temp;
while (std::cin.good()){temp += std::cin.get();}//read all of std::cin to temp while (std::cin.good()){
temp += std::cin.get();
} //read all of std::cin to temp
temp.erase(temp.size() - 1, 1); //strip the invalid last character temp.erase(temp.size() - 1, 1); //strip the invalid last character
AMF::Object amfdata = AMF::parse(temp); //parse temp into an AMF::Object AMF::Object amfdata = AMF::parse(temp); //parse temp into an AMF::Object
amfdata.Print(); //pretty-print the object amfdata.Print(); //pretty-print the object

View file

@ -39,12 +39,18 @@ int main(int argc, char ** argv){
while ( !F.getJSON().isNull()){ while ( !F.getJSON().isNull()){
std::cout << F.getJSON().toPrettyString() << std::endl; std::cout << F.getJSON().toPrettyString() << std::endl;
nowpack = F.getJSON()["time"].asInt(); nowpack = F.getJSON()["time"].asInt();
if (firstpack == 0){firstpack = nowpack;} if (firstpack == 0){
firstpack = nowpack;
}
if (F.getJSON()["datatype"].asString() == "audio"){ if (F.getJSON()["datatype"].asString() == "audio"){
if (lastaudio != 0 && (nowpack - lastaudio) != 0){ if (lastaudio != 0 && (nowpack - lastaudio) != 0){
bps = F.getJSON()["data"].asString().size() / (nowpack - lastaudio); bps = F.getJSON()["data"].asString().size() / (nowpack - lastaudio);
if (bps < aud_min){aud_min = bps;} if (bps < aud_min){
if (bps > aud_max){aud_max = bps;} aud_min = bps;
}
if (bps > aud_max){
aud_max = bps;
}
} }
totalaudio += F.getJSON()["data"].asString().size(); totalaudio += F.getJSON()["data"].asString().size();
lastaudio = nowpack; lastaudio = nowpack;
@ -52,22 +58,34 @@ int main(int argc, char ** argv){
if (F.getJSON()["datatype"].asString() == "video"){ if (F.getJSON()["datatype"].asString() == "video"){
if (lastvideo != 0 && (nowpack - lastvideo) != 0){ if (lastvideo != 0 && (nowpack - lastvideo) != 0){
bps = F.getJSON()["data"].asString().size() / (nowpack - lastvideo); bps = F.getJSON()["data"].asString().size() / (nowpack - lastvideo);
if (bps < vid_min){vid_min = bps;} if (bps < vid_min){
if (bps > vid_max){vid_max = bps;} vid_min = bps;
}
if (bps > vid_max){
vid_max = bps;
}
} }
if (F.getJSON()["keyframe"].asInt() != 0){ if (F.getJSON()["keyframe"].asInt() != 0){
if (lastkey != 0){ if (lastkey != 0){
bps = nowpack - lastkey; bps = nowpack - lastkey;
if (bps < key_min){key_min = bps;} if (bps < key_min){
if (bps > key_max){key_max = bps;} key_min = bps;
}
if (bps > key_max){
key_max = bps;
}
} }
keyframes++; keyframes++;
lastkey = nowpack; lastkey = nowpack;
} }
if (F.getJSON()["offset"].asInt() != 0){ if (F.getJSON()["offset"].asInt() != 0){
bps = F.getJSON()["offset"].asInt(); bps = F.getJSON()["offset"].asInt();
if (bps < bfrm_min){bfrm_min = bps;} if (bps < bfrm_min){
if (bps > bfrm_max){bfrm_max = bps;} bfrm_min = bps;
}
if (bps > bfrm_max){
bfrm_max = bps;
}
} }
totalvideo += F.getJSON()["data"].asString().size(); totalvideo += F.getJSON()["data"].asString().size();
lastvideo = nowpack; lastvideo = nowpack;

View file

@ -17,7 +17,9 @@ int main(int argc, char ** argv) {
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
std::string temp; std::string temp;
while (std::cin.good()){temp += std::cin.get();}//read all of std::cin to temp while (std::cin.good()){
temp += std::cin.get();
} //read all of std::cin to temp
temp.erase(temp.size() - 1, 1); //strip the invalid last character temp.erase(temp.size() - 1, 1); //strip the invalid last character
MP4::Box mp4data; MP4::Box mp4data;

View file

@ -30,7 +30,9 @@ int Detail = 0;
/// Automatically skips 3073 bytes of handshake data. /// Automatically skips 3073 bytes of handshake data.
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.addOption("detail", JSON::fromString("{\"arg_num\":1, \"arg\":\"integer\", \"default\":0, \"help\":\"Bitmask, 1 = Reconstruct, 2 = Explicit media info, 4 = Verbose chunks\"}")); conf.addOption("detail",
JSON::fromString(
"{\"arg_num\":1, \"arg\":\"integer\", \"default\":0, \"help\":\"Bitmask, 1 = Reconstruct, 2 = Explicit media info, 4 = Verbose chunks\"}"));
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
Detail = conf.getInteger("detail"); Detail = conf.getInteger("detail");
@ -49,17 +51,19 @@ int main(int argc, char ** argv){
} }
std::string inbuffer; std::string inbuffer;
while (std::cin.good()){inbuffer += std::cin.get();}//read all of std::cin to temp while (std::cin.good()){
inbuffer += std::cin.get();
} //read all of std::cin to temp
inbuffer.erase(0, 3073); //strip the handshake part inbuffer.erase(0, 3073); //strip the handshake part
RTMPStream::Chunk next; RTMPStream::Chunk next;
FLV::Tag F; //FLV holder FLV::Tag F; //FLV holder
AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER); AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER);
AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER); AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
while (next.Parse(inbuffer)){ while (next.Parse(inbuffer)){
if ((Detail & DETAIL_VERBOSE) == DETAIL_VERBOSE){ if ((Detail & DETAIL_VERBOSE) == DETAIL_VERBOSE){
fprintf(stderr, "Chunk info: [%#2X] CS ID %u, timestamp %u, len %u, type ID %u, Stream ID %u\n", next.headertype, next.cs_id, next.timestamp, next.len, next.msg_type_id, next.msg_stream_id); fprintf(stderr, "Chunk info: [%#2X] CS ID %u, timestamp %u, len %u, type ID %u, Stream ID %u\n", next.headertype, next.cs_id, next.timestamp,
next.len, next.msg_type_id, next.msg_stream_id);
} }
switch (next.msg_type_id){ switch (next.msg_type_id){
case 0: //does not exist case 0: //does not exist
@ -106,7 +110,8 @@ int main(int argc, char ** argv){
fprintf(stderr, "CTRL: User control message: UNKNOWN %hu - %u\n", ucmtype, ntohl(*(unsigned int*)(next.data.c_str()+2))); fprintf(stderr, "CTRL: User control message: UNKNOWN %hu - %u\n", ucmtype, ntohl(*(unsigned int*)(next.data.c_str()+2)));
break; break;
} }
} break; }
break;
case 5: //window size of other end case 5: //window size of other end
RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str()); RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str());
RTMPStream::rec_window_at = RTMPStream::rec_cnt; RTMPStream::rec_window_at = RTMPStream::rec_cnt;
@ -158,7 +163,8 @@ int main(int argc, char ** argv){
amf3data = AMF::parse3(next.data); amf3data = AMF::parse3(next.data);
amf3data.Print(); amf3data.Print();
} }
} break; }
break;
case 18: { case 18: {
fprintf(stderr, "Received AFM0 data message (metadata):\n"); fprintf(stderr, "Received AFM0 data message (metadata):\n");
amfdata = AMF::parse(next.data); amfdata = AMF::parse(next.data);
@ -167,7 +173,8 @@ int main(int argc, char ** argv){
F.ChunkLoader(next); F.ChunkLoader(next);
std::cout.write(F.data, F.len); std::cout.write(F.data, F.len);
} }
} break; }
break;
case 19: case 19:
fprintf(stderr, "Received AFM0 shared object\n"); fprintf(stderr, "Received AFM0 shared object\n");
break; break;
@ -175,7 +182,8 @@ int main(int argc, char ** argv){
fprintf(stderr, "Received AFM0 command message:\n"); fprintf(stderr, "Received AFM0 command message:\n");
amfdata = AMF::parse(next.data); amfdata = AMF::parse(next.data);
std::cerr << amfdata.Print() << std::endl; std::cerr << amfdata.Print() << std::endl;
} break; }
break;
case 22: case 22:
fprintf(stderr, "Received aggregate message\n"); fprintf(stderr, "Received aggregate message\n");
break; break;

View file

@ -30,9 +30,10 @@ namespace Buffer{
return t.tv_sec * 1000 + t.tv_usec / 1000; return t.tv_sec * 1000 + t.tv_usec / 1000;
} //getNowMS } //getNowMS
void handleStats(void * empty){ void handleStats(void * empty){
if (empty != 0){return;} if (empty != 0){
return;
}
std::string double_newline = "\n\n"; std::string double_newline = "\n\n";
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
while (buffer_running){ while (buffer_running){
@ -86,31 +87,40 @@ namespace Buffer{
}else{ }else{
usr->Disconnect("Push denied - invalid IP address!"); usr->Disconnect("Push denied - invalid IP address!");
} }
} break; }
break;
case 'S': { //Stats case 'S': { //Stats
usr->tmpStats = Stats(usr->S.Received().get().substr(2)); usr->tmpStats = Stats(usr->S.Received().get().substr(2));
unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime;
if (secs < 1){secs = 1;} if (secs < 1){
secs = 1;
}
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs;
usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs;
usr->lastStats = usr->tmpStats; usr->lastStats = usr->tmpStats;
thisStream->saveStats(usr->MyStr, usr->tmpStats); thisStream->saveStats(usr->MyStr, usr->tmpStats);
} break; }
break;
case 's': { //second-seek case 's': { //second-seek
//ignored for now //ignored for now
} break; }
break;
case 'f': { //frame-seek case 'f': { //frame-seek
//ignored for now //ignored for now
} break; }
break;
case 'p': { //play case 'p': { //play
//ignored for now //ignored for now
} break; }
break;
case 'o': { //once-play case 'o': { //once-play
//ignored for now //ignored for now
} break; }
break;
case 'q': { //quit-playing case 'q': { //quit-playing
//ignored for now //ignored for now
} break; }
break;
} }
} }
} }
@ -120,7 +130,9 @@ namespace Buffer{
/// Loop reading DTSC data from stdin and processing it at the correct speed. /// Loop reading DTSC data from stdin and processing it at the correct speed.
void handleStdin(void * empty){ void handleStdin(void * empty){
if (empty != 0){return;} if (empty != 0){
return;
}
long long int timeDiff = 0; //difference between local time and stream time long long int timeDiff = 0; //difference between local time and stream time
unsigned int lastPacket = 0; //last parsed packet timestamp unsigned int lastPacket = 0; //last parsed packet timestamp
std::string inBuffer; std::string inBuffer;
@ -157,7 +169,9 @@ namespace Buffer{
/// Loop reading DTSC data from an IP push address. /// Loop reading DTSC data from an IP push address.
/// No changes to the speed are made. /// No changes to the speed are made.
void handlePushin(void * empty){ void handlePushin(void * empty){
if (empty != 0){return;} if (empty != 0){
return;
}
while (buffer_running){ while (buffer_running){
if (thisStream->getIPInput().connected()){ if (thisStream->getIPInput().connected()){
if (thisStream->getIPInput().spool()){ if (thisStream->getIPInput().spool()){
@ -182,9 +196,13 @@ namespace Buffer{
/// Starts a loop, waiting for connections to send data to. /// Starts a loop, waiting for connections to send data to.
int Start(int argc, char ** argv){ int Start(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.addOption("stream_name", JSON::fromString("{\"arg_num\":1, \"arg\":\"string\", \"help\":\"Name of the stream this buffer will be providing.\"}")); conf.addOption("stream_name",
conf.addOption("awaiting_ip", JSON::fromString("{\"arg_num\":2, \"arg\":\"string\", \"default\":\"\", \"help\":\"IP address to expect incoming data from. This will completely disable reading from standard input if used.\"}")); JSON::fromString("{\"arg_num\":1, \"arg\":\"string\", \"help\":\"Name of the stream this buffer will be providing.\"}"));
conf.addOption("reportstats", JSON::fromString("{\"default\":0, \"help\":\"Report stats to a controller process.\", \"short\":\"s\", \"long\":\"reportstats\"}")); conf.addOption("awaiting_ip",
JSON::fromString(
"{\"arg_num\":2, \"arg\":\"string\", \"default\":\"\", \"help\":\"IP address to expect incoming data from. This will completely disable reading from standard input if used.\"}"));
conf.addOption("reportstats",
JSON::fromString("{\"default\":0, \"help\":\"Report stats to a controller process.\", \"short\":\"s\", \"long\":\"reportstats\"}"));
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
std::string name = conf.getString("stream_name"); std::string name = conf.getString("stream_name");
@ -201,7 +219,9 @@ namespace Buffer{
Socket::Connection std_input(fileno(stdin)); Socket::Connection std_input(fileno(stdin));
tthread::thread * StatsThread = 0; tthread::thread * StatsThread = 0;
if (conf.getBool("reportstats")){StatsThread = new tthread::thread(handleStats, 0);} if (conf.getBool("reportstats")){
StatsThread = new tthread::thread(handleStats, 0);
}
tthread::thread * StdinThread = 0; tthread::thread * StdinThread = 0;
std::string await_ip = conf.getString("awaiting_ip"); std::string await_ip = conf.getString("awaiting_ip");
if (await_ip == ""){ if (await_ip == ""){
@ -230,14 +250,18 @@ namespace Buffer{
StatsThread->join(); StatsThread->join();
delete StatsThread; delete StatsThread;
} }
if (thisStream->getIPInput().connected()){thisStream->getIPInput().close();} if (thisStream->getIPInput().connected()){
thisStream->getIPInput().close();
}
StdinThread->join(); StdinThread->join();
delete StdinThread; delete StdinThread;
delete thisStream; delete thisStream;
return 0; return 0;
} }
};//Buffer namespace }
;
//Buffer namespace
/// Entry point for Buffer, simply calls Buffer::Start(). /// Entry point for Buffer, simply calls Buffer::Start().
int main(int argc, char ** argv){ int main(int argc, char ** argv){

View file

@ -13,7 +13,9 @@ Buffer::Stream * Buffer::Stream::get(){
if (ref == 0){ if (ref == 0){
//prevent creating two at the same time //prevent creating two at the same time
creator.lock(); creator.lock();
if (ref == 0){ref = new Stream();} if (ref == 0){
ref = new Stream();
}
creator.unlock(); creator.unlock();
} }
return ref; return ref;
@ -62,8 +64,12 @@ std::string & Buffer::Stream::getStats(){
Storage["totals"]["now"] = now; Storage["totals"]["now"] = now;
Storage["buffer"] = name; Storage["buffer"] = name;
Storage["meta"] = Strm->metadata; Storage["meta"] = Strm->metadata;
if (Storage["meta"].isMember("audio")){Storage["meta"]["audio"].removeMember("init");} if (Storage["meta"].isMember("audio")){
if (Storage["meta"].isMember("video")){Storage["meta"]["video"].removeMember("init");} Storage["meta"]["audio"].removeMember("init");
}
if (Storage["meta"].isMember("video")){
Storage["meta"]["video"].removeMember("init");
}
ret = Storage.toString(); ret = Storage.toString();
Storage["log"].null(); Storage["log"].null();
stats_mutex.unlock(); stats_mutex.unlock();
@ -115,7 +121,6 @@ Socket::Connection & Buffer::Stream::getIPInput(){
return ip_input; return ip_input;
} }
/// Stores intermediate statistics. /// Stores intermediate statistics.
void Buffer::Stream::saveStats(std::string username, Stats & stats){ void Buffer::Stream::saveStats(std::string username, Stats & stats){
stats_mutex.lock(); stats_mutex.lock();
@ -134,7 +139,8 @@ void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string
if (Storage["curr"].isMember(username)){ if (Storage["curr"].isMember(username)){
Storage["curr"].removeMember(username); Storage["curr"].removeMember(username);
#if DEBUG >= 4 #if DEBUG >= 4
std::cout << "Disconnected user " << username << ": " << reason << ". " << stats.connector << " transferred " << stats.up << " up and " << stats.down << " down in " << stats.conntime << " seconds to " << stats.host << std::endl; std::cout << "Disconnected user " << username << ": " << reason << ". " << stats.connector << " transferred " << stats.up << " up and "
<< stats.down << " down in " << stats.conntime << " seconds to " << stats.host << std::endl;
#endif #endif
} }
Storage["log"][username]["connector"] = stats.connector; Storage["log"][username]["connector"] = stats.connector;
@ -190,7 +196,9 @@ void Buffer::Stream::dropWriteLock(bool newpackets_available){
writers--; writers--;
rw_mutex.unlock(); rw_mutex.unlock();
rw_change.notify_all(); rw_change.notify_all();
if (newpackets_available){moreData.notify_all();} if (newpackets_available){
moreData.notify_all();
}
} }
/// Blocks until reading is safe. /// Blocks until reading is safe.

View file

@ -3,7 +3,9 @@
#pragma once #pragma once
#include <string> #include <string>
#include <mist/dtsc.h>
#include <mist/json.h> #include <mist/json.h>
#include <mist/socket.h>
#include "tinythread.h" #include "tinythread.h"
#include "buffer_user.h" #include "buffer_user.h"
@ -70,4 +72,5 @@ namespace Buffer{
std::string name; ///< Name for this buffer. std::string name; ///< Name for this buffer.
tthread::condition_variable moreData; ///< Triggered when more data becomes available. tthread::condition_variable moreData; ///< Triggered when more data becomes available.
}; };
}; }
;

View file

@ -5,7 +5,6 @@
#include "buffer_stream.h" #include "buffer_stream.h"
#include <sstream> #include <sstream>
#include <stdlib.h> //for atoi and friends #include <stdlib.h> //for atoi and friends
int Buffer::user::UserCount = 0; int Buffer::user::UserCount = 0;
/// Creates a new user from a newly connected socket. /// Creates a new user from a newly connected socket.
@ -21,6 +20,8 @@ Buffer::user::user(Socket::Connection fd){
currsend = 0; currsend = 0;
myRing = 0; myRing = 0;
Thread = 0; Thread = 0;
gotproperaudio = false;
lastpointer = 0;
} //constructor } //constructor
/// Drops held DTSC::Ring class, if one is held. /// Drops held DTSC::Ring class, if one is held.
@ -31,17 +32,23 @@ Buffer::user::~user(){
/// Disconnects the current user. Doesn't do anything if already disconnected. /// Disconnects the current user. Doesn't do anything if already disconnected.
/// Prints "Disconnected user" to stdout if disconnect took place. /// Prints "Disconnected user" to stdout if disconnect took place.
void Buffer::user::Disconnect(std::string reason){ void Buffer::user::Disconnect(std::string reason){
if (S.connected()){S.close();} if (S.connected()){
S.close();
}
Stream::get()->clearStats(MyStr, lastStats, reason); Stream::get()->clearStats(MyStr, lastStats, reason);
} //Disconnect } //Disconnect
/// Tries to send the current buffer, returns true if success, false otherwise. /// Tries to send the current buffer, returns true if success, false otherwise.
/// Has a side effect of dropping the connection if send will never complete. /// Has a side effect of dropping the connection if send will never complete.
bool Buffer::user::doSend(const char * ptr, int len){ bool Buffer::user::doSend(const char * ptr, int len){
if (!len){return false;}//do not do empty sends if ( !len){
return false;
} //do not do empty sends
int r = S.iwrite(ptr + currsend, len - currsend); int r = S.iwrite(ptr + currsend, len - currsend);
if (r <= 0){ if (r <= 0){
if (errno == EWOULDBLOCK){return false;} if (errno == EWOULDBLOCK){
return false;
}
Disconnect(S.getError()); Disconnect(S.getError());
return false; return false;
} }
@ -51,8 +58,12 @@ bool Buffer::user::doSend(const char * ptr, int len){
/// Try to send data to this user. Disconnects if any problems occur. /// Try to send data to this user. Disconnects if any problems occur.
void Buffer::user::Send(){ void Buffer::user::Send(){
if (!myRing){return;}//no ring! if ( !myRing){
if (!S.connected()){return;}//cancel if not connected return;
} //no ring!
if ( !S.connected()){
return;
} //cancel if not connected
if (myRing->waiting){ if (myRing->waiting){
Stream::get()->waitForData(); Stream::get()->waitForData();
return; return;
@ -69,7 +80,10 @@ void Buffer::user::Send(){
if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){ if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){
//switch to next buffer //switch to next buffer
currsend = 0; currsend = 0;
if (myRing->b <= 0){myRing->waiting = true; return;}//no next buffer? go in waiting mode. if (myRing->b <= 0){
myRing->waiting = true;
return;
} //no next buffer? go in waiting mode.
myRing->b--; myRing->b--;
} //completed a send } //completed a send
Stream::get()->dropReadLock(); Stream::get()->dropReadLock();

View file

@ -14,7 +14,6 @@
#include <mist/socket.h> #include <mist/socket.h>
#include <mist/http_parser.h> #include <mist/http_parser.h>
#include <mist/config.h> #include <mist/config.h>
#include <mist/procs.h>
#include <mist/stream.h> #include <mist/stream.h>
#include <mist/timing.h> #include <mist/timing.h>
#include <mist/auth.h> #include <mist/auth.h>
@ -34,12 +33,14 @@ namespace Connector_HTTP{
ConnConn(){ ConnConn(){
conn = 0; conn = 0;
lastuse = 0; lastuse = 0;
}; }
;
/// Constructor that sets lastuse to 0, but socket to s. /// Constructor that sets lastuse to 0, but socket to s.
ConnConn(Socket::Connection * s){ ConnConn(Socket::Connection * s){
conn = s; conn = s;
lastuse = 0; lastuse = 0;
}; }
;
/// Destructor that deletes the socket if non-null. /// Destructor that deletes the socket if non-null.
~ConnConn(){ ~ConnConn(){
if (conn){ if (conn){
@ -47,7 +48,8 @@ namespace Connector_HTTP{
delete conn; delete conn;
} }
conn = 0; conn = 0;
}; }
;
}; };
std::map<std::string, ConnConn *> connconn; ///< Connections to connectors std::map<std::string, ConnConn *> connconn; ///< Connections to connectors
@ -75,7 +77,9 @@ namespace Connector_HTTP{
delete it->second; delete it->second;
connconn.erase(it); connconn.erase(it);
it = connconn.begin(); //get a valid iterator it = connconn.begin(); //get a valid iterator
if (it == connconn.end()){return;} if (it == connconn.end()){
return;
}
} }
} }
} }
@ -89,14 +93,16 @@ namespace Connector_HTTP{
void Handle_None(HTTP::Parser & H, Socket::Connection * conn){ void Handle_None(HTTP::Parser & H, Socket::Connection * conn){
H.Clean(); H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody("<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>"); H.SetBody(
"<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>");
conn->SendNow(H.BuildResponse("415", "Unsupported Media Type")); conn->SendNow(H.BuildResponse("415", "Unsupported Media Type"));
} }
void Handle_Timeout(HTTP::Parser & H, Socket::Connection * conn){ void Handle_Timeout(HTTP::Parser & H, Socket::Connection * conn){
H.Clean(); H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody("<!DOCTYPE html><html><head><title>Gateway timeout</title></head><body><h1>Gateway timeout</h1>Though the server understood your request and attempted to handle it, somehow handling it took longer than it should. Your request has been cancelled - please try again later.</body></html>"); H.SetBody(
"<!DOCTYPE html><html><head><title>Gateway timeout</title></head><body><h1>Gateway timeout</h1>Though the server understood your request and attempted to handle it, somehow handling it took longer than it should. Your request has been cancelled - please try again later.</body></html>");
conn->SendNow(H.BuildResponse("504", "Gateway Timeout")); conn->SendNow(H.BuildResponse("504", "Gateway Timeout"));
} }
@ -109,7 +115,8 @@ namespace Connector_HTTP{
H.Clean(); H.Clean();
H.SetHeader("Content-Type", "text/xml"); H.SetHeader("Content-Type", "text/xml");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><allow-access-from domain=\"*\" /><site-control permitted-cross-domain-policies=\"all\"/></cross-domain-policy>"); H.SetBody(
"<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><allow-access-from domain=\"*\" /><site-control permitted-cross-domain-policies=\"all\"/></cross-domain-policy>");
conn->SendNow(H.BuildResponse("200", "OK")); conn->SendNow(H.BuildResponse("200", "OK"));
return; return;
} //crossdomain.xml } //crossdomain.xml
@ -118,12 +125,14 @@ namespace Connector_HTTP{
H.Clean(); H.Clean();
H.SetHeader("Content-Type", "text/xml"); H.SetHeader("Content-Type", "text/xml");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody("<?xml version=\"1.0\" encoding=\"utf-8\"?><access-policy><cross-domain-access><policy><allow-from http-methods=\"*\" http-request-headers=\"*\"><domain uri=\"*\"/></allow-from><grant-to><resource path=\"/\" include-subpaths=\"true\"/></grant-to></policy></cross-domain-access></access-policy>"); H.SetBody(
"<?xml version=\"1.0\" encoding=\"utf-8\"?><access-policy><cross-domain-access><policy><allow-from http-methods=\"*\" http-request-headers=\"*\"><domain uri=\"*\"/></allow-from><grant-to><resource path=\"/\" include-subpaths=\"true\"/></grant-to></policy></cross-domain-access></access-policy>");
conn->SendNow(H.BuildResponse("200", "OK")); conn->SendNow(H.BuildResponse("200", "OK"));
return; return;
} //clientaccesspolicy.xml } //clientaccesspolicy.xml
if ((url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js") || (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js")){ if ((url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js")
|| (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js")){
std::string streamname; std::string streamname;
if (url.substr(0, 6) == "/info_"){ if (url.substr(0, 6) == "/info_"){
streamname = url.substr(6, url.length() - 9); streamname = url.substr(6, url.length() - 9);
@ -134,7 +143,9 @@ namespace Connector_HTTP{
JSON::Value ServConf = JSON::fromFile("/tmp/mist/streamlist"); JSON::Value ServConf = JSON::fromFile("/tmp/mist/streamlist");
std::string response; std::string response;
std::string host = H.GetHeader("Host"); std::string host = H.GetHeader("Host");
if (host.find(':')){host.resize(host.find(':'));} if (host.find(':')){
host.resize(host.find(':'));
}
H.Clean(); H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetHeader("Content-Type", "application/javascript"); H.SetHeader("Content-Type", "application/javascript");
@ -201,7 +212,9 @@ namespace Connector_HTTP{
//check if a connection exists, and if not create one //check if a connection exists, and if not create one
conn_mutex.lock(); conn_mutex.lock();
if ( !connconn.count(uid) || !connconn[uid]->conn->connected()){ if ( !connconn.count(uid) || !connconn[uid]->conn->connected()){
if (connconn.count(uid)){connconn.erase(uid);} if (connconn.count(uid)){
connconn.erase(uid);
}
connconn[uid] = new ConnConn(new Socket::Connection("/tmp/mist/http_" + connector)); connconn[uid] = new ConnConn(new Socket::Connection("/tmp/mist/http_" + connector));
connconn[uid]->conn->setBlocking(false); //do not block on spool() with no data connconn[uid]->conn->setBlocking(false); //do not block on spool() with no data
#if DEBUG >= 4 #if DEBUG >= 4
@ -330,10 +343,18 @@ namespace Connector_HTTP{
return "progressive"; return "progressive";
} }
} }
if (url == "/crossdomain.xml"){return "internal";} if (url == "/crossdomain.xml"){
if (url == "/clientaccesspolicy.xml"){return "internal";} return "internal";
if (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js"){return "internal";} }
if (url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js"){return "internal";} if (url == "/clientaccesspolicy.xml"){
return "internal";
}
if (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js"){
return "internal";
}
if (url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js"){
return "internal";
}
return "none"; return "none";
} }
@ -358,7 +379,8 @@ namespace Connector_HTTP{
std::string handler = getHTTPType(Client); std::string handler = getHTTPType(Client);
long long int startms = Util::getMS(); long long int startms = Util::getMS();
#if DEBUG >= 4 #if DEBUG >= 4
std::cout << "Received request: " << Client.getUrl() << " (" << conn->getSocket() << ") => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl; std::cout << "Received request: " << Client.getUrl() << " (" << conn->getSocket() << ") => " << handler << " (" << Client.GetVar("stream")
<< ")" << std::endl;
#endif #endif
if (handler == "none" || handler == "internal"){ if (handler == "none" || handler == "internal"){
if (handler == "internal"){ if (handler == "internal"){
@ -394,15 +416,16 @@ namespace Connector_HTTP{
thread_mutex.unlock(); thread_mutex.unlock();
} }
};//Connector_HTTP namespace } //Connector_HTTP namespace
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION); Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(8080); conf.addConnectorOptions(8080);
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
Socket::Server server_socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface")); Socket::Server server_socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface"));
if (!server_socket.connected()){return 1;} if ( !server_socket.connected()){
return 1;
}
conf.activate(); conf.activate();
while (server_socket.connected() && conf.is_active){ while (server_socket.connected() && conf.is_active){
@ -425,6 +448,24 @@ int main(int argc, char ** argv){
} }
} //while connected and not requested to stop } //while connected and not requested to stop
server_socket.close(); server_socket.close();
Util::Procs::StopAll();
//wait for existing connections to drop
bool repeat = true;
while (repeat){
Connector_HTTP::thread_mutex.lock();
repeat = !Connector_HTTP::active_threads.empty();
if (repeat){
usleep(100000); //sleep 100ms
}
//clean up any threads that may have finished
while ( !Connector_HTTP::done_threads.empty()){
tthread::thread * T = *Connector_HTTP::done_threads.begin();
T->join();
Connector_HTTP::done_threads.erase(T);
delete T;
}
Connector_HTTP::thread_mutex.unlock();
}
return 0; return 0;
} //main } //main

View file

@ -43,7 +43,6 @@ namespace Connector_HTTP{
asrt.setSegmentRun(1, metadata["keytime"].size(), 0); asrt.setSegmentRun(1, metadata["keytime"].size(), 0);
} }
MP4::AFRT afrt; MP4::AFRT afrt;
if (starttime == 0){ if (starttime == 0){
afrt.setUpdate(false); afrt.setUpdate(false);
@ -116,12 +115,12 @@ namespace Connector_HTTP{
return std::string((char*)abst.asBox(), (int)abst.boxedSize()); return std::string((char*)abst.asBox(), (int)abst.boxedSize());
} }
/// Returns a F4M-format manifest file /// Returns a F4M-format manifest file
std::string BuildManifest(std::string & MovieId, JSON::Value & metadata){ std::string BuildManifest(std::string & MovieId, JSON::Value & metadata){
std::string Result; std::string Result;
if (metadata.isMember("length") && metadata["length"].asInt() > 0){ if (metadata.isMember("length") && metadata["length"].asInt() > 0){
Result="<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" Result =
"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
"<manifest xmlns=\"http://ns.adobe.com/f4m/1.0\">\n" "<manifest xmlns=\"http://ns.adobe.com/f4m/1.0\">\n"
"<id>" + MovieId + "</id>\n" "<id>" + MovieId + "</id>\n"
"<width>" + metadata["video"]["width"].asString() + "</width>\n" "<width>" + metadata["video"]["width"].asString() + "</width>\n"
@ -130,8 +129,10 @@ namespace Connector_HTTP{
"<mimeType>video/mp4</mimeType>\n" "<mimeType>video/mp4</mimeType>\n"
"<streamType>recorded</streamType>\n" "<streamType>recorded</streamType>\n"
"<deliveryType>streaming</deliveryType>\n" "<deliveryType>streaming</deliveryType>\n"
"<bootstrapInfo profile=\"named\" id=\"bootstrap1\">" + Base64::encode(GenerateBootstrap(MovieId, metadata, 1, 0, 0)) + "</bootstrapInfo>\n" "<bootstrapInfo profile=\"named\" id=\"bootstrap1\">" + Base64::encode(GenerateBootstrap(MovieId, metadata, 1, 0, 0))
"<media streamId=\"1\" bootstrapInfoId=\"bootstrap1\" url=\"" + MovieId + "/\">\n" + "</bootstrapInfo>\n"
"<media streamId=\"1\" bootstrapInfoId=\"bootstrap1\" url=\"" + MovieId
+ "/\">\n"
"<metadata>AgAKb25NZXRhRGF0YQgAAAAAAAl0cmFja2luZm8KAAAAAgMACXRpbWVzY2FsZQBA+GoAAAAAAAAGbGVuZ3RoAEGMcHoQAAAAAAhsYW5ndWFnZQIAA2VuZwARc2FtcGxlZGVzY3JpcHRpb24KAAAAAQMACnNhbXBsZXR5cGUCAARhdmMxAAAJAAAJAwAJdGltZXNjYWxlAEDncAAAAAAAAAZsZW5ndGgAQXtNvTAAAAAACGxhbmd1YWdlAgADZW5nABFzYW1wbGVkZXNjcmlwdGlvbgoAAAABAwAKc2FtcGxldHlwZQIABG1wNGEAAAkAAAkADWF1ZGlvY2hhbm5lbHMAQAAAAAAAAAAAD2F1ZGlvc2FtcGxlcmF0ZQBA53AAAAAAAAAOdmlkZW9mcmFtZXJhdGUAQDf/gi5SciUABmFhY2FvdABAAAAAAAAAAAAIYXZjbGV2ZWwAQD8AAAAAAAAACmF2Y3Byb2ZpbGUAQFNAAAAAAAAADGF1ZGlvY29kZWNpZAIABG1wNGEADHZpZGVvY29kZWNpZAIABGF2YzEABXdpZHRoAECQ4AAAAAAAAAZoZWlnaHQAQIMAAAAAAAAACmZyYW1lV2lkdGgAQJDgAAAAAAAAC2ZyYW1lSGVpZ2h0AECDAAAAAAAAAAxkaXNwbGF5V2lkdGgAQJDgAAAAAAAADWRpc3BsYXlIZWlnaHQAQIMAAAAAAAAADG1vb3Zwb3NpdGlvbgBBmxq2uAAAAAAIZHVyYXRpb24AQIKjqW3oyhIAAAk=</metadata>\n" "<metadata>AgAKb25NZXRhRGF0YQgAAAAAAAl0cmFja2luZm8KAAAAAgMACXRpbWVzY2FsZQBA+GoAAAAAAAAGbGVuZ3RoAEGMcHoQAAAAAAhsYW5ndWFnZQIAA2VuZwARc2FtcGxlZGVzY3JpcHRpb24KAAAAAQMACnNhbXBsZXR5cGUCAARhdmMxAAAJAAAJAwAJdGltZXNjYWxlAEDncAAAAAAAAAZsZW5ndGgAQXtNvTAAAAAACGxhbmd1YWdlAgADZW5nABFzYW1wbGVkZXNjcmlwdGlvbgoAAAABAwAKc2FtcGxldHlwZQIABG1wNGEAAAkAAAkADWF1ZGlvY2hhbm5lbHMAQAAAAAAAAAAAD2F1ZGlvc2FtcGxlcmF0ZQBA53AAAAAAAAAOdmlkZW9mcmFtZXJhdGUAQDf/gi5SciUABmFhY2FvdABAAAAAAAAAAAAIYXZjbGV2ZWwAQD8AAAAAAAAACmF2Y3Byb2ZpbGUAQFNAAAAAAAAADGF1ZGlvY29kZWNpZAIABG1wNGEADHZpZGVvY29kZWNpZAIABGF2YzEABXdpZHRoAECQ4AAAAAAAAAZoZWlnaHQAQIMAAAAAAAAACmZyYW1lV2lkdGgAQJDgAAAAAAAAC2ZyYW1lSGVpZ2h0AECDAAAAAAAAAAxkaXNwbGF5V2lkdGgAQJDgAAAAAAAADWRpc3BsYXlIZWlnaHQAQIMAAAAAAAAADG1vb3Zwb3NpdGlvbgBBmxq2uAAAAAAIZHVyYXRpb24AQIKjqW3oyhIAAAk=</metadata>\n"
"</media>\n" "</media>\n"
"</manifest>\n"; "</manifest>\n";
@ -232,7 +233,9 @@ namespace Connector_HTTP{
HTTP_S.Clean(); HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml"); HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
if (Strm.metadata.isMember("length")){receive_marks = true;} if (Strm.metadata.isMember("length")){
receive_marks = true;
}
std::string manifest = BuildManifest(streamname, Strm.metadata); std::string manifest = BuildManifest(streamname, Strm.metadata);
HTTP_S.SetBody(manifest); HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
@ -296,7 +299,9 @@ namespace Connector_HTTP{
HTTP_S.Clean(); HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml"); HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
if (Strm.metadata.isMember("length")){receive_marks = true;} if (Strm.metadata.isMember("length")){
receive_marks = true;
}
std::string manifest = BuildManifest(streamname, Strm.metadata); std::string manifest = BuildManifest(streamname, Strm.metadata);
HTTP_S.SetBody(manifest); HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
@ -305,7 +310,9 @@ namespace Connector_HTTP{
#endif #endif
pending_manifest = false; pending_manifest = false;
} }
if (!receive_marks && Strm.metadata.isMember("length")){receive_marks = true;} if ( !receive_marks && Strm.metadata.isMember("length")){
receive_marks = true;
}
if ((Strm.getPacket(0).isMember("keyframe") && !receive_marks) || Strm.lastType() == DTSC::PAUSEMARK){ if ((Strm.getPacket(0).isMember("keyframe") && !receive_marks) || Strm.lastType() == DTSC::PAUSEMARK){
#if DEBUG >= 4 #if DEBUG >= 4
fprintf(stderr, "Received a %s fragment of %i bytes.\n", Strm.getPacket(0)["datatype"].asString().c_str(), FlashBufSize); fprintf(stderr, "Received a %s fragment of %i bytes.\n", Strm.getPacket(0)["datatype"].asString().c_str(), FlashBufSize);
@ -375,7 +382,9 @@ namespace Connector_HTTP{
HTTP_S.Clean(); HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml"); HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
if (Strm.metadata.isMember("length")){receive_marks = true;} if (Strm.metadata.isMember("length")){
receive_marks = true;
}
std::string manifest = BuildManifest(streamname, Strm.metadata); std::string manifest = BuildManifest(streamname, Strm.metadata);
HTTP_S.SetBody(manifest); HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
@ -385,14 +394,18 @@ namespace Connector_HTTP{
pending_manifest = false; pending_manifest = false;
} }
} }
if (!ss.connected()){break;} if ( !ss.connected()){
break;
}
} }
} }
conn.close(); conn.close();
ss.SendNow(conn.getStats("HTTP_Dynamic").c_str()); ss.SendNow(conn.getStats("HTTP_Dynamic").c_str());
ss.close(); ss.close();
#if DEBUG >= 1 #if DEBUG >= 1
if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());} if (FLV::Parse_Error){
fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());
}
fprintf(stderr, "User %i disconnected.\n", conn.getSocket()); fprintf(stderr, "User %i disconnected.\n", conn.getSocket());
if (inited){ if (inited){
fprintf(stderr, "Status was: inited\n"); fprintf(stderr, "Status was: inited\n");
@ -407,14 +420,16 @@ namespace Connector_HTTP{
return 0; return 0;
} //Connector_HTTP_Dynamic main function } //Connector_HTTP_Dynamic main function
};//Connector_HTTP_Dynamic namespace } //Connector_HTTP_Dynamic namespace
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION); Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(1935); conf.addConnectorOptions(1935);
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
Socket::Server server_socket = Socket::Server("/tmp/mist/http_dynamic"); Socket::Server server_socket = Socket::Server("/tmp/mist/http_dynamic");
if (!server_socket.connected()){return 1;} if ( !server_socket.connected()){
return 1;
}
conf.activate(); conf.activate();
while (server_socket.connected() && conf.is_active){ while (server_socket.connected() && conf.is_active){

View file

@ -60,7 +60,9 @@ namespace Connector_HTTP{
//we assume the URL is the stream name with a 3 letter extension //we assume the URL is the stream name with a 3 letter extension
streamname = HTTP_R.getUrl().substr(1); streamname = HTTP_R.getUrl().substr(1);
size_t extDot = streamname.rfind('.'); size_t extDot = streamname.rfind('.');
if (extDot != std::string::npos){streamname.resize(extDot);};//strip the extension if (extDot != std::string::npos){
streamname.resize(extDot);
}; //strip the extension
int start = 0; int start = 0;
if ( !HTTP_R.GetVar("start").empty()){ if ( !HTTP_R.GetVar("start").empty()){
start = atoi(HTTP_R.GetVar("start").c_str()); start = atoi(HTTP_R.GetVar("start").c_str());
@ -170,14 +172,18 @@ namespace Connector_HTTP{
}else{ }else{
Util::sleep(1); Util::sleep(1);
} }
if (!ss.connected()){break;} if ( !ss.connected()){
break;
}
} }
} }
conn.close(); conn.close();
ss.SendNow(conn.getStats("HTTP_Dynamic").c_str()); ss.SendNow(conn.getStats("HTTP_Dynamic").c_str());
ss.close(); ss.close();
#if DEBUG >= 1 #if DEBUG >= 1
if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());} if (FLV::Parse_Error){
fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());
}
fprintf(stderr, "User %i disconnected.\n", conn.getSocket()); fprintf(stderr, "User %i disconnected.\n", conn.getSocket());
if (inited){ if (inited){
fprintf(stderr, "Status was: inited\n"); fprintf(stderr, "Status was: inited\n");
@ -192,14 +198,16 @@ namespace Connector_HTTP{
return 0; return 0;
} //Connector_HTTP main function } //Connector_HTTP main function
};//Connector_HTTP namespace } //Connector_HTTP namespace
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION); Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(1935); conf.addConnectorOptions(1935);
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
Socket::Server server_socket = Socket::Server("/tmp/mist/http_progressive"); Socket::Server server_socket = Socket::Server("/tmp/mist/http_progressive");
if (!server_socket.connected()){return 1;} if ( !server_socket.connected()){
return 1;
}
conf.activate(); conf.activate();
while (server_socket.connected() && conf.is_active){ while (server_socket.connected() && conf.is_active){

View file

@ -15,7 +15,6 @@
#include <mist/http_parser.h> #include <mist/http_parser.h>
#include <mist/json.h> #include <mist/json.h>
#include <mist/dtsc.h> #include <mist/dtsc.h>
#include <mist/flv_tag.h>
#include <mist/base64.h> #include <mist/base64.h>
#include <mist/amf.h> #include <mist/amf.h>
#include <mist/mp4.h> #include <mist/mp4.h>
@ -30,26 +29,34 @@ namespace Connector_HTTP{
std::string BuildManifest(std::string & MovieId, JSON::Value & metadata){ std::string BuildManifest(std::string & MovieId, JSON::Value & metadata){
std::stringstream Result; std::stringstream Result;
Result << "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"; Result << "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n";
Result << "<SmoothStreamingMedia MajorVersion=\"2\" MinorVersion=\"0\" TimeScale=\"10000000\" Duration=\"" << metadata["lastms"].asInt() << "\">\n"; Result << "<SmoothStreamingMedia MajorVersion=\"2\" MinorVersion=\"0\" TimeScale=\"10000000\" Duration=\"" << metadata["lastms"].asInt()
<< "\">\n";
if (metadata.isMember("audio")){ if (metadata.isMember("audio")){
Result << " <StreamIndex Type=\"audio\" QualityLevels=\"1\" Name=\"audio\" Chunks=\"" << metadata["keytime"].size() << "\" Url=\"Q({bitrate})/A({start time})\">\n"; Result << " <StreamIndex Type=\"audio\" QualityLevels=\"1\" Name=\"audio\" Chunks=\"" << metadata["keytime"].size()
<< "\" Url=\"Q({bitrate})/A({start time})\">\n";
Result << " <QualityLevel Index=\"0\" Bitrate=\"" << metadata["audio"]["bps"].asInt() * 8 << "\" CodecPrivateData=\""; Result << " <QualityLevel Index=\"0\" Bitrate=\"" << metadata["audio"]["bps"].asInt() * 8 << "\" CodecPrivateData=\"";
Result << std::hex; Result << std::hex;
for (int i = 0; i < metadata["audio"]["init"].asString().size(); i++){ for (int i = 0; i < metadata["audio"]["init"].asString().size(); i++){
Result << std::setfill('0') << std::setw(2) << std::right << (int)metadata["audio"]["init"].asString()[i]; Result << std::setfill('0') << std::setw(2) << std::right << (int)metadata["audio"]["init"].asString()[i];
} }
Result << std::dec; Result << std::dec;
Result << "\" SamplingRate=\"" << metadata["audio"]["rate"].asInt() << "\" Channels=\"2\" BitsPerSample=\"16\" PacketSize=\"4\" AudioTag=\"255\" FourCC=\"AACL\" />\n"; Result << "\" SamplingRate=\"" << metadata["audio"]["rate"].asInt()
<< "\" Channels=\"2\" BitsPerSample=\"16\" PacketSize=\"4\" AudioTag=\"255\" FourCC=\"AACL\" />\n";
for (int i = 0; i < metadata["keytime"].size() - 1; i++){ for (int i = 0; i < metadata["keytime"].size() - 1; i++){
Result << " <c "; Result << " <c ";
if( i == 0 ) { Result << "t=\"0\" "; } if (i == 0){
Result << "t=\"0\" ";
}
Result << "d=\"" << 10000 * (metadata["keytime"][i + 1].asInt() - metadata["keytime"][i].asInt()) << "\" />\n"; Result << "d=\"" << 10000 * (metadata["keytime"][i + 1].asInt() - metadata["keytime"][i].asInt()) << "\" />\n";
} }
Result << " <c d=\"" << 10000 * (metadata["lastms"].asInt() - metadata["keytime"][metadata["keytime"].size() - 1].asInt()) << "\" />\n"; Result << " <c d=\"" << 10000 * (metadata["lastms"].asInt() - metadata["keytime"][metadata["keytime"].size() - 1].asInt()) << "\" />\n";
Result << " </StreamIndex>\n"; Result << " </StreamIndex>\n";
} }
if (metadata.isMember("video")){ if (metadata.isMember("video")){
Result << " <StreamIndex Type=\"video\" QualityLevels=\"1\" Name=\"video\" Chunks=\"" << metadata["keytime"].size() << "\" Url=\"Q({bitrate})/V({start time})\" MaxWidth=\"" << metadata["video"]["width"].asInt() << "\" MaxHeight=\"" << metadata["video"]["height"].asInt() << "\" DisplayWidth=\"" << metadata["video"]["width"].asInt() << "\" DisplayHeight=\"" << metadata["video"]["height"].asInt() << "\">\n"; Result << " <StreamIndex Type=\"video\" QualityLevels=\"1\" Name=\"video\" Chunks=\"" << metadata["keytime"].size()
<< "\" Url=\"Q({bitrate})/V({start time})\" MaxWidth=\"" << metadata["video"]["width"].asInt() << "\" MaxHeight=\""
<< metadata["video"]["height"].asInt() << "\" DisplayWidth=\"" << metadata["video"]["width"].asInt() << "\" DisplayHeight=\""
<< metadata["video"]["height"].asInt() << "\">\n";
Result << " <QualityLevel Index=\"0\" Bitrate=\"" << metadata["video"]["bps"].asInt() * 8 << "\" CodecPrivateData=\""; Result << " <QualityLevel Index=\"0\" Bitrate=\"" << metadata["video"]["bps"].asInt() * 8 << "\" CodecPrivateData=\"";
MP4::AVCC avccbox; MP4::AVCC avccbox;
avccbox.setPayload(metadata["video"]["init"].asString()); avccbox.setPayload(metadata["video"]["init"].asString());
@ -59,10 +66,13 @@ namespace Connector_HTTP{
Result << std::setfill('0') << std::setw(2) << std::right << (int)tmpString[i]; Result << std::setfill('0') << std::setw(2) << std::right << (int)tmpString[i];
} }
Result << std::dec; Result << std::dec;
Result << "\" MaxWidth=\"" << metadata["video"]["width"].asInt() << "\" MaxHeight=\"" << metadata["video"]["height"].asInt() << "\" FourCC=\"AVC1\" />\n"; Result << "\" MaxWidth=\"" << metadata["video"]["width"].asInt() << "\" MaxHeight=\"" << metadata["video"]["height"].asInt()
<< "\" FourCC=\"AVC1\" />\n";
for (int i = 0; i < metadata["keytime"].size() - 1; i++){ for (int i = 0; i < metadata["keytime"].size() - 1; i++){
Result << " <c "; Result << " <c ";
if( i == 0 ) { Result << "t=\"0\" "; } if (i == 0){
Result << "t=\"0\" ";
}
Result << "d=\"" << 10000 * (metadata["keytime"][i + 1].asInt() - metadata["keytime"][i].asInt()) << "\" />\n"; Result << "d=\"" << 10000 * (metadata["keytime"][i + 1].asInt() - metadata["keytime"][i].asInt()) << "\" />\n";
} }
Result << " <c d=\"" << 10000 * (metadata["lastms"].asInt() - metadata["keytime"][metadata["keytime"].size() - 1].asInt()) << "\" />\n"; Result << " <c d=\"" << 10000 * (metadata["lastms"].asInt() - metadata["keytime"][metadata["keytime"].size() - 1].asInt()) << "\" />\n";
@ -82,7 +92,6 @@ namespace Connector_HTTP{
std::vector<int> Timestamps; std::vector<int> Timestamps;
int FlashBufSize = 0; int FlashBufSize = 0;
long long int FlashBufTime = 0; long long int FlashBufTime = 0;
FLV::Tag tmp;//temporary tag
DTSC::Stream Strm; //Incoming stream buffer. DTSC::Stream Strm; //Incoming stream buffer.
HTTP::Parser HTTP_R, HTTP_S; //HTTP Receiver en HTTP Sender. HTTP::Parser HTTP_R, HTTP_S; //HTTP Receiver en HTTP Sender.
@ -147,8 +156,12 @@ namespace Connector_HTTP{
tempStr = HTTP_R.url.substr(HTTP_R.url.find(")/") + 2); tempStr = HTTP_R.url.substr(HTTP_R.url.find(")/") + 2);
wantsAudio = false; wantsAudio = false;
wantsVideo = false; wantsVideo = false;
if( tempStr[0] == 'A' ) { wantsAudio = true; } if (tempStr[0] == 'A'){
if( tempStr[0] == 'V' ) { wantsVideo = true; } wantsAudio = true;
}
if (tempStr[0] == 'V'){
wantsVideo = true;
}
tempStr = tempStr.substr(tempStr.find("(") + 1); tempStr = tempStr.substr(tempStr.find("(") + 1);
ReqFragment = atoll(tempStr.substr(0, tempStr.find(")")).c_str()); ReqFragment = atoll(tempStr.substr(0, tempStr.find(")")).c_str());
#if DEBUG >= 4 #if DEBUG >= 4
@ -164,7 +177,9 @@ namespace Connector_HTTP{
HTTP_S.Clean(); HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml"); HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
if (Strm.metadata.isMember("length")){receive_marks = true;} if (Strm.metadata.isMember("length")){
receive_marks = true;
}
std::string manifest = BuildManifest(streamname, Strm.metadata); std::string manifest = BuildManifest(streamname, Strm.metadata);
HTTP_S.SetBody(manifest); HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
@ -228,7 +243,9 @@ namespace Connector_HTTP{
HTTP_S.Clean(); HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml"); HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
if (Strm.metadata.isMember("length")){receive_marks = true;} if (Strm.metadata.isMember("length")){
receive_marks = true;
}
std::string manifest = BuildManifest(streamname, Strm.metadata); std::string manifest = BuildManifest(streamname, Strm.metadata);
HTTP_S.SetBody(manifest); HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
@ -237,7 +254,9 @@ namespace Connector_HTTP{
#endif #endif
pending_manifest = false; pending_manifest = false;
} }
if (!receive_marks && Strm.metadata.isMember("length")){receive_marks = true;} if ( !receive_marks && Strm.metadata.isMember("length")){
receive_marks = true;
}
if (Strm.lastType() == DTSC::PAUSEMARK){ if (Strm.lastType() == DTSC::PAUSEMARK){
Timestamps.push_back(Strm.getPacket(0)["time"].asInt()); Timestamps.push_back(Strm.getPacket(0)["time"].asInt());
} }
@ -279,7 +298,8 @@ namespace Connector_HTTP{
MP4::TRUN trun_box; MP4::TRUN trun_box;
//maybe reinsert dataOffset //maybe reinsert dataOffset
std::cerr << "Setting Flags: " << (MP4::trundataOffset | MP4::trunfirstSampleFlags | MP4::trunsampleDuration | MP4::trunsampleSize) << std::endl; std::cerr << "Setting Flags: " << (MP4::trundataOffset | MP4::trunfirstSampleFlags | MP4::trunsampleDuration | MP4::trunsampleSize)
<< std::endl;
trun_box.setFlags(MP4::trundataOffset | MP4::trunfirstSampleFlags | MP4::trunsampleDuration | MP4::trunsampleSize); trun_box.setFlags(MP4::trundataOffset | MP4::trunfirstSampleFlags | MP4::trunsampleDuration | MP4::trunsampleSize);
trun_box.setDataOffset(42); trun_box.setDataOffset(42);
trun_box.setFirstSampleFlags(0x00000040 | MP4::isIPicture | MP4::noDisposable | MP4::isKeySample); trun_box.setFirstSampleFlags(0x00000040 | MP4::isIPicture | MP4::noDisposable | MP4::isKeySample);
@ -311,14 +331,11 @@ namespace Connector_HTTP{
traf_box.setContent(trun_box, 1); traf_box.setContent(trun_box, 1);
moof_box.setContent(traf_box, 1); moof_box.setContent(traf_box, 1);
//std::cerr << "\t[encoded] = " << ((MP4::TRUN&)(((MP4::TRAF&)(moof_box.getContent(1))).getContent(1))).getDataOffset() << std::endl; //std::cerr << "\t[encoded] = " << ((MP4::TRUN&)(((MP4::TRAF&)(moof_box.getContent(1))).getContent(1))).getDataOffset() << std::endl;
HTTP_S.SetHeader("Content-Length", FlashBufSize + 8 + moof_box.boxedSize()); //32+33+btstrp.size()); HTTP_S.SetHeader("Content-Length", FlashBufSize + 8 + moof_box.boxedSize()); //32+33+btstrp.size());
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
conn.SendNow(moof_box.asBox(), moof_box.boxedSize()); conn.SendNow(moof_box.asBox(), moof_box.boxedSize());
unsigned long size = htonl(FlashBufSize+8); unsigned long size = htonl(FlashBufSize+8);
@ -346,7 +363,9 @@ namespace Connector_HTTP{
HTTP_S.Clean(); HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml"); HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
if (Strm.metadata.isMember("length")){receive_marks = true;} if (Strm.metadata.isMember("length")){
receive_marks = true;
}
std::string manifest = BuildManifest(streamname, Strm.metadata); std::string manifest = BuildManifest(streamname, Strm.metadata);
HTTP_S.SetBody(manifest); HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
@ -356,14 +375,15 @@ namespace Connector_HTTP{
pending_manifest = false; pending_manifest = false;
} }
} }
if (!ss.connected()){break;} if ( !ss.connected()){
break;
}
} }
} }
conn.close(); conn.close();
ss.SendNow(conn.getStats("HTTP_Smooth").c_str()); ss.SendNow(conn.getStats("HTTP_Smooth").c_str());
ss.close(); ss.close();
#if DEBUG >= 1 #if DEBUG >= 1
if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());}
fprintf(stderr, "User %i disconnected.\n", conn.getSocket()); fprintf(stderr, "User %i disconnected.\n", conn.getSocket());
if (inited){ if (inited){
fprintf(stderr, "Status was: inited\n"); fprintf(stderr, "Status was: inited\n");
@ -378,14 +398,16 @@ namespace Connector_HTTP{
return 0; return 0;
} //Connector_HTTP_Smooth main function } //Connector_HTTP_Smooth main function
};//Connector_HTTP_Smooth namespace } //Connector_HTTP_Smooth namespace
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION); Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(1935); conf.addConnectorOptions(1935);
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
Socket::Server server_socket = Socket::Server("/tmp/mist/http_smooth"); Socket::Server server_socket = Socket::Server("/tmp/mist/http_smooth");
if (!server_socket.connected()){return 1;} if ( !server_socket.connected()){
return 1;
}
conf.activate(); conf.activate();
while (server_socket.connected() && conf.is_active){ while (server_socket.connected() && conf.is_active){

View file

@ -43,8 +43,7 @@ namespace Connector_RTMP{
void sendCommand(AMF::Object & amfreply, int messagetype, int stream_id); ///< Sends a RTMP command either in AMF or AMF3 mode. void sendCommand(AMF::Object & amfreply, int messagetype, int stream_id); ///< Sends a RTMP command either in AMF or AMF3 mode.
void parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id); ///< Parses a single AMF command message. void parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id); ///< Parses a single AMF command message.
int Connector_RTMP(Socket::Connection conn); int Connector_RTMP(Socket::Connection conn);
};//Connector_RTMP namespace; } //Connector_RTMP namespace;
/// Main Connector_RTMP function /// Main Connector_RTMP function
int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
@ -53,13 +52,19 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
FLV::Tag tag, init_tag; FLV::Tag tag, init_tag;
DTSC::Stream Strm; DTSC::Stream Strm;
while (!Socket.Received().available(1537) && Socket.connected()){Socket.spool(); Util::sleep(5);} while ( !Socket.Received().available(1537) && Socket.connected()){
Socket.spool();
Util::sleep(5);
}
RTMPStream::handshake_in = Socket.Received().remove(1537); RTMPStream::handshake_in = Socket.Received().remove(1537);
RTMPStream::rec_cnt += 1537; RTMPStream::rec_cnt += 1537;
if (RTMPStream::doHandshake()){ if (RTMPStream::doHandshake()){
Socket.SendNow(RTMPStream::handshake_out); Socket.SendNow(RTMPStream::handshake_out);
while (!Socket.Received().available(1536) && Socket.connected()){Socket.spool(); Util::sleep(5);} while ( !Socket.Received().available(1536) && Socket.connected()){
Socket.spool();
Util::sleep(5);
}
Socket.Received().remove(1536); Socket.Received().remove(1536);
RTMPStream::rec_cnt += 1536; RTMPStream::rec_cnt += 1536;
#if DEBUG >= 4 #if DEBUG >= 4
@ -175,7 +180,9 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
SS.SendNow(Socket.getStats("RTMP").c_str()); SS.SendNow(Socket.getStats("RTMP").c_str());
SS.close(); SS.close();
#if DEBUG >= 1 #if DEBUG >= 1
if (FLV::Parse_Error){fprintf(stderr, "FLV Parse Error: %s\n", FLV::Error_Str.c_str());} if (FLV::Parse_Error){
fprintf(stderr, "FLV Parse Error: %s\n", FLV::Error_Str.c_str());
}
fprintf(stderr, "User %i disconnected.\n", conn.getSocket()); fprintf(stderr, "User %i disconnected.\n", conn.getSocket());
if (inited){ if (inited){
fprintf(stderr, "Status was: inited\n"); fprintf(stderr, "Status was: inited\n");
@ -252,17 +259,34 @@ void Connector_RTMP::parseChunk(Socket::Buffer & inbuffer){
#if DEBUG >= 4 #if DEBUG >= 4
short int ucmtype = ntohs(*(short int*)next.data.c_str()); short int ucmtype = ntohs(*(short int*)next.data.c_str());
switch (ucmtype){ switch (ucmtype){
case 0: fprintf(stderr, "CTRL: UCM StreamBegin %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; case 0:
case 1: fprintf(stderr, "CTRL: UCM StreamEOF %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; fprintf(stderr, "CTRL: UCM StreamBegin %i\n", ntohl(*((int*)(next.data.c_str()+2))));
case 2: fprintf(stderr, "CTRL: UCM StreamDry %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; break;
case 3: fprintf(stderr, "CTRL: UCM SetBufferLength %i %i\n", ntohl(*((int*)(next.data.c_str()+2))), ntohl(*((int*)(next.data.c_str()+6)))); break; case 1:
case 4: fprintf(stderr, "CTRL: UCM StreamIsRecorded %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; fprintf(stderr, "CTRL: UCM StreamEOF %i\n", ntohl(*((int*)(next.data.c_str()+2))));
case 6: fprintf(stderr, "CTRL: UCM PingRequest %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; break;
case 7: fprintf(stderr, "CTRL: UCM PingResponse %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; case 2:
default: fprintf(stderr, "CTRL: UCM Unknown (%hi)\n", ucmtype); break; fprintf(stderr, "CTRL: UCM StreamDry %i\n", ntohl(*((int*)(next.data.c_str()+2))));
break;
case 3:
fprintf(stderr, "CTRL: UCM SetBufferLength %i %i\n", ntohl(*((int*)(next.data.c_str()+2))), ntohl(*((int*)(next.data.c_str()+6))));
break;
case 4:
fprintf(stderr, "CTRL: UCM StreamIsRecorded %i\n", ntohl(*((int*)(next.data.c_str()+2))));
break;
case 6:
fprintf(stderr, "CTRL: UCM PingRequest %i\n", ntohl(*((int*)(next.data.c_str()+2))));
break;
case 7:
fprintf(stderr, "CTRL: UCM PingResponse %i\n", ntohl(*((int*)(next.data.c_str()+2))));
break;
default:
fprintf(stderr, "CTRL: UCM Unknown (%hi)\n", ucmtype);
break;
} }
#endif #endif
} break; }
break;
case 5: //window size of other end case 5: //window size of other end
#if DEBUG >= 4 #if DEBUG >= 4
fprintf(stderr, "CTRL: Window size\n"); fprintf(stderr, "CTRL: Window size\n");
@ -336,7 +360,8 @@ void Connector_RTMP::parseChunk(Socket::Buffer & inbuffer){
amfdata = AMF::parse(next.data); amfdata = AMF::parse(next.data);
parseAMFCommand(amfdata, 17, next.msg_stream_id); parseAMFCommand(amfdata, 17, next.msg_stream_id);
} //parsing AMF0-style } //parsing AMF0-style
} break; }
break;
case 19: case 19:
#if DEBUG >= 4 #if DEBUG >= 4
fprintf(stderr, "Received AFM0 shared object\n"); fprintf(stderr, "Received AFM0 shared object\n");
@ -345,7 +370,8 @@ void Connector_RTMP::parseChunk(Socket::Buffer & inbuffer){
case 20: { //AMF0 command message case 20: { //AMF0 command message
amfdata = AMF::parse(next.data); amfdata = AMF::parse(next.data);
parseAMFCommand(amfdata, 20, next.msg_stream_id); parseAMFCommand(amfdata, 20, next.msg_stream_id);
} break; }
break;
case 22: case 22:
#if DEBUG >= 4 #if DEBUG >= 4
fprintf(stderr, "Received aggregate message\n"); fprintf(stderr, "Received aggregate message\n");
@ -388,13 +414,21 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
int tmpint; int tmpint;
if (amfdata.getContentP(2)->getContentP("videoCodecs")){ if (amfdata.getContentP(2)->getContentP("videoCodecs")){
tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} if (tmpint & 0x04){
if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} fprintf(stderr, "Sorensen video support detected\n");
}
if (tmpint & 0x80){
fprintf(stderr, "H264 video support detected\n");
}
} }
if (amfdata.getContentP(2)->getContentP("audioCodecs")){ if (amfdata.getContentP(2)->getContentP("audioCodecs")){
tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} if (tmpint & 0x04){
if (tmpint & 0x400){fprintf(stderr, "AAC audio support detected\n");} fprintf(stderr, "MP3 audio support detected\n");
}
if (tmpint & 0x400){
fprintf(stderr, "AAC audio support detected\n");
}
} }
#endif #endif
RTMPStream::chunk_snd_max = 4096; RTMPStream::chunk_snd_max = 4096;
@ -439,7 +473,9 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
return; return;
} //createStream } //createStream
if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){ if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){
if (SS.connected()){SS.close();} if (SS.connected()){
SS.close();
}
return; return;
} }
if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){
@ -578,7 +614,9 @@ int main(int argc, char ** argv){
conf.addConnectorOptions(1935); conf.addConnectorOptions(1935);
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
Socket::Server server_socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface")); Socket::Server server_socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface"));
if (!server_socket.connected()){return 1;} if ( !server_socket.connected()){
return 1;
}
conf.activate(); conf.activate();
while (server_socket.connected() && conf.is_active){ while (server_socket.connected() && conf.is_active){

View file

@ -2,26 +2,8 @@
/// Contains all code for the controller executable. /// Contains all code for the controller executable.
#include <iostream> #include <iostream>
#include <fstream>
#include <string>
#include <sstream>
#include <vector> #include <vector>
#include <map>
#include <set>
#include <cstdlib>
#include <queue>
#include <cmath>
#include <cstdio>
#include <climits>
#include <cstring>
#include <unistd.h>
#include <getopt.h>
#include <set>
#include <sys/wait.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <sstream>
#include <mist/config.h> #include <mist/config.h>
#include <mist/socket.h> #include <mist/socket.h>
#include <mist/http_parser.h> #include <mist/http_parser.h>
@ -30,6 +12,8 @@
#include <mist/timing.h> #include <mist/timing.h>
#include "controller_storage.h" #include "controller_storage.h"
#include "controller_connectors.h" #include "controller_connectors.h"
#include "controller_streams.h"
#include "controller_capabilities.h"
#include "server.html.h" #include "server.html.h"
#define UPLINK_INTERVAL 30 #define UPLINK_INTERVAL 30
@ -38,17 +22,8 @@
namespace Controller { namespace Controller {
std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers.
Secure::Auth keychecker; ///< Checks key authorization. Secure::Auth keychecker; ///< Checks key authorization.
void WriteFile( std::string Filename, std::string contents ) {
std::ofstream File;
File.open( Filename.c_str( ) );
File << contents << std::endl;
File.close( );
}
class ConnectedUser{ class ConnectedUser{
public: public:
Socket::Connection C; Socket::Connection C;
@ -84,7 +59,8 @@ void Authorize( JSON::Value & Request, JSON::Value & Response, ConnectedUser & c
} }
} }
if (UserID != ""){ if (UserID != ""){
if (Request["authorize"]["password"].asString() != "" && Secure::md5(Storage["account"][UserID]["password"].asString()) != Request["authorize"]["password"].asString()){ if (Request["authorize"]["password"].asString() != ""
&& Secure::md5(Storage["account"][UserID]["password"].asString()) != Request["authorize"]["password"].asString()){
Log("AUTH", "Failed login attempt " + UserID + " @ " + conn.C.getHost()); Log("AUTH", "Failed login attempt " + UserID + " @ " + conn.C.getHost());
} }
} }
@ -117,47 +93,6 @@ void CheckConfig(JSON::Value & in, JSON::Value & out){
out = in; out = in;
} }
bool streamsEqual(JSON::Value & one, JSON::Value & two){
if (one["channel"]["URL"] != two["channel"]["URL"]){return false;}
if (one["preset"]["cmd"] != two["preset"]["cmd"]){return false;}
return true;
}
void startStream(std::string name, JSON::Value & data){
std::string URL = data["channel"]["URL"];
std::string preset = data["preset"]["cmd"];
std::string cmd1, cmd2, cmd3;
if (URL.substr(0, 4) == "push"){
std::string pusher = URL.substr(7);
cmd2 = "MistBuffer -s "+name+" "+pusher;
Util::Procs::Start(name, Util::getMyPath() + cmd2);
Log("BUFF", "(re)starting stream buffer "+name+" for push data from "+pusher);
}else{
if (URL.substr(0, 1) == "/"){
struct stat fileinfo;
if (stat(URL.c_str(), &fileinfo) != 0 || S_ISDIR(fileinfo.st_mode)){
Log("BUFF", "Warning for VoD stream "+name+"! File not found: "+URL);
data["error"] = "Not found: "+URL;
return;
}
cmd1 = "cat "+URL;
data["error"] = "Available";
return; //MistPlayer handles VoD
}else{
cmd1 = "ffmpeg -re -async 2 -i "+URL+" "+preset+" -f flv -";
cmd2 = "MistFLV2DTSC";
}
cmd3 = "MistBuffer -s "+name;
if (cmd2 != ""){
Util::Procs::Start(name, cmd1, Util::getMyPath() + cmd2, Util::getMyPath() + cmd3);
Log("BUFF", "(re)starting stream buffer "+name+" for ffmpeg data: "+cmd1);
}else{
Util::Procs::Start(name, cmd1, Util::getMyPath() + cmd3);
Log("BUFF", "(re)starting stream buffer "+name+" using input file "+URL);
}
}
}
void CheckStats(JSON::Value & stats){ void CheckStats(JSON::Value & stats){
long long int currTime = Util::epoch(); long long int currTime = Util::epoch();
for (JSON::ObjIter jit = stats.ObjBegin(); jit != stats.ObjEnd(); jit++){ for (JSON::ObjIter jit = stats.ObjBegin(); jit != stats.ObjEnd(); jit++){
@ -170,7 +105,9 @@ void CheckStats(JSON::Value & stats){
if (u_it->second.isMember("now") && u_it->second["now"].asInt() < currTime - 3){ if (u_it->second.isMember("now") && u_it->second["now"].asInt() < currTime - 3){
jit->second["log"].append(u_it->second); jit->second["log"].append(u_it->second);
jit->second["curr"].removeMember(u_it->first); jit->second["curr"].removeMember(u_it->first);
if (!jit->second["curr"].size()){break;} if ( !jit->second["curr"].size()){
break;
}
u_it = jit->second["curr"].ObjBegin(); u_it = jit->second["curr"].ObjBegin();
} }
} }
@ -179,194 +116,38 @@ void CheckStats(JSON::Value & stats){
} }
} }
class cpudata { } //Controller namespace
public:
std::string model;
int cores;
int threads;
int mhz;
int id;
cpudata(){
model = "Unknown";
cores = 1;
threads = 1;
mhz = 0;
id = 0;
};
void fill(char * data){
int i;
i = 0;
if (sscanf(data, "model name : %n", &i) != EOF && i > 0){model = (data+i);}
if (sscanf(data, "cpu cores : %d", &i) == 1){cores = i;}
if (sscanf(data, "siblings : %d", &i) == 1){threads = i;}
if (sscanf(data, "physical id : %d", &i) == 1){id = i;}
if (sscanf(data, "cpu MHz : %d", &i) == 1){mhz = i;}
};
};
void checkCapable(JSON::Value & capa){
capa.null();
std::ifstream cpuinfo("/proc/cpuinfo");
if (cpuinfo){
std::map<int, cpudata> cpus;
char line[300];
int proccount = -1;
while (cpuinfo.good()){
cpuinfo.getline(line, 300);
if (cpuinfo.fail()){
//empty lines? ignore them, clear flags, continue
if (!cpuinfo.eof()){
cpuinfo.ignore();
cpuinfo.clear();
}
continue;
}
if (memcmp(line, "processor", 9) == 0){proccount++;}
cpus[proccount].fill(line);
}
//fix wrong core counts
std::map<int,int> corecounts;
for (int i = 0; i <= proccount; ++i){
corecounts[cpus[i].id]++;
}
//remove double physical IDs - we only want real CPUs.
std::set<int> used_physids;
int total_speed = 0;
int total_threads = 0;
for (int i = 0; i <= proccount; ++i){
if (!used_physids.count(cpus[i].id)){
used_physids.insert(cpus[i].id);
JSON::Value thiscpu;
thiscpu["model"] = cpus[i].model;
thiscpu["cores"] = cpus[i].cores;
if (cpus[i].cores < 2 && corecounts[cpus[i].id] > cpus[i].cores){
thiscpu["cores"] = corecounts[cpus[i].id];
}
thiscpu["threads"] = cpus[i].threads;
if (thiscpu["cores"].asInt() > thiscpu["threads"].asInt()){
thiscpu["threads"] = thiscpu["cores"];
}
thiscpu["mhz"] = cpus[i].mhz;
capa["cpu"].append(thiscpu);
total_speed += cpus[i].cores * cpus[i].mhz;
total_threads += cpus[i].threads;
}
}
capa["speed"] = total_speed;
capa["threads"] = total_threads;
}
std::ifstream meminfo("/proc/meminfo");
if (meminfo){
char line[300];
int bufcache = 0;
while (meminfo.good()){
meminfo.getline(line, 300);
if (meminfo.fail()){
//empty lines? ignore them, clear flags, continue
if (!meminfo.eof()){
meminfo.ignore();
meminfo.clear();
}
continue;
}
long long int i;
if (sscanf(line, "MemTotal : %Li kB", &i) == 1){capa["mem"]["total"] = i/1024;}
if (sscanf(line, "MemFree : %Li kB", &i) == 1){capa["mem"]["free"] = i/1024;}
if (sscanf(line, "SwapTotal : %Li kB", &i) == 1){capa["mem"]["swaptotal"] = i/1024;}
if (sscanf(line, "SwapFree : %Li kB", &i) == 1){capa["mem"]["swapfree"] = i/1024;}
if (sscanf(line, "Buffers : %Li kB", &i) == 1){bufcache += i/1024;}
if (sscanf(line, "Cached : %Li kB", &i) == 1){bufcache += i/1024;}
}
capa["mem"]["used"] = capa["mem"]["total"].asInt() - capa["mem"]["free"].asInt() - bufcache;
capa["mem"]["cached"] = bufcache;
capa["load"]["memory"] = ((capa["mem"]["used"].asInt() + (capa["mem"]["swaptotal"].asInt() - capa["mem"]["swapfree"].asInt())) * 100) / capa["mem"]["total"].asInt();
}
std::ifstream loadavg("/proc/loadavg");
if (loadavg){
char line[300];
int bufcache = 0;
loadavg.getline(line, 300);
//parse lines here
float onemin, fivemin, fifteenmin;
if (sscanf(line, "%f %f %f", &onemin, &fivemin, &fifteenmin) == 3){
capa["load"]["one"] = (long long int)(onemin * 100);
capa["load"]["five"] = (long long int)(onemin * 100);
capa["load"]["fifteen"] = (long long int)(onemin * 100);
}
}
}
void CheckAllStreams(JSON::Value & data){
long long int currTime = Util::epoch();
for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){
if (!Util::Procs::isActive(jit->first)){
startStream(jit->first, jit->second);
}
if (currTime - lastBuffer[jit->first] > 5){
if (jit->second.isMember("error") && jit->second["error"].asString() != ""){
jit->second["online"] = jit->second["error"];
}else{
jit->second["online"] = 0;
}
}else{
jit->second["online"] = 1;
}
}
static JSON::Value strlist;
bool changed = false;
if (strlist["config"] != Storage["config"]){
strlist["config"] = Storage["config"];
changed = true;
}
if (strlist["streams"] != Storage["streams"]){
strlist["streams"] = Storage["streams"];
changed = true;
}
if (changed){WriteFile("/tmp/mist/streamlist", strlist.toString());}
}
void CheckStreams(JSON::Value & in, JSON::Value & out){
bool changed = false;
for (JSON::ObjIter jit = in.ObjBegin(); jit != in.ObjEnd(); jit++){
if (out.isMember(jit->first)){
if (!streamsEqual(jit->second, out[jit->first])){
Log("STRM", std::string("Updated stream ")+jit->first);
Util::Procs::Stop(jit->first);
startStream(jit->first, jit->second);
}
}else{
Log("STRM", std::string("New stream ")+jit->first);
startStream(jit->first, jit->second);
}
}
for (JSON::ObjIter jit = out.ObjBegin(); jit != out.ObjEnd(); jit++){
if (!in.isMember(jit->first)){
Log("STRM", std::string("Deleted stream ")+jit->first);
Util::Procs::Stop(jit->first);
}
}
out = in;
}
}; //Connector namespace
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Controller::Storage = JSON::fromFile("config.json"); Controller::Storage = JSON::fromFile("config.json");
JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}"); JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}");
stored_port["default"] = Controller::Storage["config"]["controller"]["port"]; stored_port["default"] = Controller::Storage["config"]["controller"]["port"];
if (!stored_port["default"]){stored_port["default"] = 4242;} if ( !stored_port["default"]){
JSON::Value stored_interface = JSON::fromString("{\"long\":\"interface\", \"short\":\"i\", \"arg\":\"string\", \"help\":\"Interface address to listen on, or 0.0.0.0 for all available interfaces.\"}"); stored_port["default"] = 4242;
}
JSON::Value stored_interface =
JSON::fromString(
"{\"long\":\"interface\", \"short\":\"i\", \"arg\":\"string\", \"help\":\"Interface address to listen on, or 0.0.0.0 for all available interfaces.\"}");
stored_interface["default"] = Controller::Storage["config"]["controller"]["interface"]; stored_interface["default"] = Controller::Storage["config"]["controller"]["interface"];
if (!stored_interface["default"]){stored_interface["default"] = "0.0.0.0";} if ( !stored_interface["default"]){
JSON::Value stored_user = JSON::fromString("{\"long\":\"username\", \"short\":\"u\", \"arg\":\"string\", \"help\":\"Username to drop privileges to, or root to not drop provileges.\"}"); stored_interface["default"] = "0.0.0.0";
}
JSON::Value stored_user = JSON::fromString(
"{\"long\":\"username\", \"short\":\"u\", \"arg\":\"string\", \"help\":\"Username to drop privileges to, or root to not drop provileges.\"}");
stored_user["default"] = Controller::Storage["config"]["controller"]["username"]; stored_user["default"] = Controller::Storage["config"]["controller"]["username"];
if (!stored_user["default"]){stored_user["default"] = "root";} if ( !stored_user["default"]){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); stored_user["default"] = "root";
}
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION " / " RELEASE);
conf.addOption("listen_port", stored_port); conf.addOption("listen_port", stored_port);
conf.addOption("listen_interface", stored_interface); conf.addOption("listen_interface", stored_interface);
conf.addOption("username", stored_user); conf.addOption("username", stored_user);
conf.addOption("daemonize", JSON::fromString("{\"long\":\"daemon\", \"short\":\"d\", \"default\":1, \"long_off\":\"nodaemon\", \"short_off\":\"n\", \"help\":\"Whether or not to daemonize the process after starting.\"}")); conf.addOption("daemonize",
conf.addOption("account", JSON::fromString("{\"long\":\"account\", \"short\":\"a\", \"arg\":\"string\" \"default\":\"\", \"help\":\"A username:password string to create a new account with.\"}")); JSON::fromString(
"{\"long\":\"daemon\", \"short\":\"d\", \"default\":1, \"long_off\":\"nodaemon\", \"short_off\":\"n\", \"help\":\"Whether or not to daemonize the process after starting.\"}"));
conf.addOption("account",
JSON::fromString(
"{\"long\":\"account\", \"short\":\"a\", \"arg\":\"string\" \"default\":\"\", \"help\":\"A username:password string to create a new account with.\"}"));
conf.addOption("uplink", JSON::fromString("{\"default\":0, \"help\":\"Enable MistSteward uplink.\", \"short\":\"U\", \"long\":\"uplink\"}")); conf.addOption("uplink", JSON::fromString("{\"default\":0, \"help\":\"Enable MistSteward uplink.\", \"short\":\"U\", \"long\":\"uplink\"}"));
conf.parseArgs(argc, argv); conf.parseArgs(argc, argv);
@ -414,13 +195,16 @@ int main(int argc, char ** argv){
users.erase(it); users.erase(it);
break; break;
} }
if (it->clientMode){uplink = &*it; gotUplink = true;} if (it->clientMode){
uplink = & *it;
gotUplink = true;
}
} }
} }
if ( !gotUplink){ if ( !gotUplink){
Incoming = Socket::Connection("gearbox.ddvtech.com", 4242, true); Incoming = Socket::Connection("gearbox.ddvtech.com", 4242, true);
if (Incoming.connected()){ if (Incoming.connected()){
users.push_back(Incoming); users.push_back((Controller::ConnectedUser)Incoming);
users.back().clientMode = true; users.back().clientMode = true;
uplink = &users.back(); uplink = &users.back();
gotUplink = true; gotUplink = true;
@ -445,9 +229,13 @@ int main(int argc, char ** argv){
} }
Incoming = API_Socket.accept(true); Incoming = API_Socket.accept(true);
if (Incoming.connected()){users.push_back(Incoming);} if (Incoming.connected()){
users.push_back((Controller::ConnectedUser)Incoming);
}
Incoming = Stats_Socket.accept(true); Incoming = Stats_Socket.accept(true);
if (Incoming.connected()){buffers.push_back(Incoming);} if (Incoming.connected()){
buffers.push_back(Incoming);
}
if (buffers.size() > 0){ if (buffers.size() > 0){
for (std::vector<Socket::Connection>::iterator it = buffers.begin(); it != buffers.end(); it++){ for (std::vector<Socket::Connection>::iterator it = buffers.begin(); it != buffers.end(); it++){
if ( !it->connected()){ if ( !it->connected()){
@ -492,7 +280,8 @@ int main(int argc, char ** argv){
Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()] = Request["vod"]; Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()] = Request["vod"];
Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()].removeMember("meta"); Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()].removeMember("meta");
JSON::Value nowtotal; JSON::Value nowtotal;
for (JSON::ObjIter u_it = Controller::Storage["statistics"][oit->first]["curr"].ObjBegin(); u_it != Controller::Storage["statistics"][oit->first]["curr"].ObjEnd(); ++u_it){ for (JSON::ObjIter u_it = Controller::Storage["statistics"][oit->first]["curr"].ObjBegin();
u_it != Controller::Storage["statistics"][oit->first]["curr"].ObjEnd(); ++u_it){
nowtotal["up"] = nowtotal["up"].asInt() + u_it->second["up"].asInt(); nowtotal["up"] = nowtotal["up"].asInt() + u_it->second["up"].asInt();
nowtotal["down"] = nowtotal["down"].asInt() + u_it->second["down"].asInt(); nowtotal["down"] = nowtotal["down"].asInt() + u_it->second["down"].asInt();
nowtotal["count"] = nowtotal["count"].asInt() + 1; nowtotal["count"] = nowtotal["count"].asInt() + 1;
@ -555,8 +344,12 @@ int main(int argc, char ** argv){
} }
} }
}else{ }else{
if (Request.isMember("config")){Controller::CheckConfig(Request["config"], Controller::Storage["config"]);} if (Request.isMember("config")){
if (Request.isMember("streams")){Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);} Controller::CheckConfig(Request["config"], Controller::Storage["config"]);
}
if (Request.isMember("streams")){
Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);
}
if (Request.isMember("clearstatlogs")){ if (Request.isMember("clearstatlogs")){
Controller::Storage["log"].null(); Controller::Storage["log"].null();
Controller::Storage["statistics"].null(); Controller::Storage["statistics"].null();
@ -576,8 +369,12 @@ int main(int argc, char ** argv){
Authorize(Request, Response, ( *it)); Authorize(Request, Response, ( *it));
if (it->Authorized){ if (it->Authorized){
//Parse config and streams from the request. //Parse config and streams from the request.
if (Request.isMember("config")){Controller::CheckConfig(Request["config"], Controller::Storage["config"]);} if (Request.isMember("config")){
if (Request.isMember("streams")){Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);} Controller::CheckConfig(Request["config"], Controller::Storage["config"]);
}
if (Request.isMember("streams")){
Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);
}
if (Request.isMember("save")){ if (Request.isMember("save")){
Controller::WriteFile("config.json", Controller::Storage.toString()); Controller::WriteFile("config.json", Controller::Storage.toString());
Controller::Log("CONF", "Config written to file on request through API"); Controller::Log("CONF", "Config written to file on request through API");
@ -589,7 +386,9 @@ int main(int argc, char ** argv){
Response["streams"] = Controller::Storage["streams"]; Response["streams"] = Controller::Storage["streams"];
//add required data to the current unix time to the config, for syncing reasons //add required data to the current unix time to the config, for syncing reasons
Response["config"]["time"] = Util::epoch(); Response["config"]["time"] = Util::epoch();
if (!Response["config"].isMember("serverid")){Response["config"]["serverid"] = "";} if ( !Response["config"].isMember("serverid")){
Response["config"]["serverid"] = "";
}
//sent any available logs and statistics //sent any available logs and statistics
Response["log"] = Controller::Storage["log"]; Response["log"] = Controller::Storage["log"];
Response["statistics"] = Controller::Storage["statistics"]; Response["statistics"] = Controller::Storage["statistics"];
@ -600,8 +399,12 @@ int main(int argc, char ** argv){
} }
} }
jsonp = ""; jsonp = "";
if (it->H.GetVar("callback") != ""){jsonp = it->H.GetVar("callback");} if (it->H.GetVar("callback") != ""){
if (it->H.GetVar("jsonp") != ""){jsonp = it->H.GetVar("jsonp");} jsonp = it->H.GetVar("callback");
}
if (it->H.GetVar("jsonp") != ""){
jsonp = it->H.GetVar("jsonp");
}
it->H.Clean(); it->H.Clean();
it->H.SetHeader("Content-Type", "text/javascript"); it->H.SetHeader("Content-Type", "text/javascript");
if (jsonp == ""){ if (jsonp == ""){

View file

@ -0,0 +1,154 @@
#include <stdio.h>
#include <string.h>
#include <fstream>
#include <set>
#include "controller_capabilities.h"
namespace Controller {
class cpudata{
public:
std::string model;
int cores;
int threads;
int mhz;
int id;
cpudata(){
model = "Unknown";
cores = 1;
threads = 1;
mhz = 0;
id = 0;
}
;
void fill(char * data){
int i;
i = 0;
if (sscanf(data, "model name : %n", &i) != EOF && i > 0){
model = (data + i);
}
if (sscanf(data, "cpu cores : %d", &i) == 1){
cores = i;
}
if (sscanf(data, "siblings : %d", &i) == 1){
threads = i;
}
if (sscanf(data, "physical id : %d", &i) == 1){
id = i;
}
if (sscanf(data, "cpu MHz : %d", &i) == 1){
mhz = i;
}
}
;
};
void checkCapable(JSON::Value & capa){
capa.null();
std::ifstream cpuinfo("/proc/cpuinfo");
if (cpuinfo){
std::map<int, cpudata> cpus;
char line[300];
int proccount = -1;
while (cpuinfo.good()){
cpuinfo.getline(line, 300);
if (cpuinfo.fail()){
//empty lines? ignore them, clear flags, continue
if ( !cpuinfo.eof()){
cpuinfo.ignore();
cpuinfo.clear();
}
continue;
}
if (memcmp(line, "processor", 9) == 0){
proccount++;
}
cpus[proccount].fill(line);
}
//fix wrong core counts
std::map<int, int> corecounts;
for (int i = 0; i <= proccount; ++i){
corecounts[cpus[i].id]++;
}
//remove double physical IDs - we only want real CPUs.
std::set<int> used_physids;
int total_speed = 0;
int total_threads = 0;
for (int i = 0; i <= proccount; ++i){
if ( !used_physids.count(cpus[i].id)){
used_physids.insert(cpus[i].id);
JSON::Value thiscpu;
thiscpu["model"] = cpus[i].model;
thiscpu["cores"] = cpus[i].cores;
if (cpus[i].cores < 2 && corecounts[cpus[i].id] > cpus[i].cores){
thiscpu["cores"] = corecounts[cpus[i].id];
}
thiscpu["threads"] = cpus[i].threads;
if (thiscpu["cores"].asInt() > thiscpu["threads"].asInt()){
thiscpu["threads"] = thiscpu["cores"];
}
thiscpu["mhz"] = cpus[i].mhz;
capa["cpu"].append(thiscpu);
total_speed += cpus[i].cores * cpus[i].mhz;
total_threads += cpus[i].threads;
}
}
capa["speed"] = total_speed;
capa["threads"] = total_threads;
}
std::ifstream meminfo("/proc/meminfo");
if (meminfo){
char line[300];
int bufcache = 0;
while (meminfo.good()){
meminfo.getline(line, 300);
if (meminfo.fail()){
//empty lines? ignore them, clear flags, continue
if ( !meminfo.eof()){
meminfo.ignore();
meminfo.clear();
}
continue;
}
long long int i;
if (sscanf(line, "MemTotal : %Li kB", &i) == 1){
capa["mem"]["total"] = i / 1024;
}
if (sscanf(line, "MemFree : %Li kB", &i) == 1){
capa["mem"]["free"] = i / 1024;
}
if (sscanf(line, "SwapTotal : %Li kB", &i) == 1){
capa["mem"]["swaptotal"] = i / 1024;
}
if (sscanf(line, "SwapFree : %Li kB", &i) == 1){
capa["mem"]["swapfree"] = i / 1024;
}
if (sscanf(line, "Buffers : %Li kB", &i) == 1){
bufcache += i / 1024;
}
if (sscanf(line, "Cached : %Li kB", &i) == 1){
bufcache += i / 1024;
}
}
capa["mem"]["used"] = capa["mem"]["total"].asInt() - capa["mem"]["free"].asInt() - bufcache;
capa["mem"]["cached"] = bufcache;
capa["load"]["memory"] = ((capa["mem"]["used"].asInt() + (capa["mem"]["swaptotal"].asInt() - capa["mem"]["swapfree"].asInt())) * 100)
/ capa["mem"]["total"].asInt();
}
std::ifstream loadavg("/proc/loadavg");
if (loadavg){
char line[300];
int bufcache = 0;
loadavg.getline(line, 300);
//parse lines here
float onemin, fivemin, fifteenmin;
if (sscanf(line, "%f %f %f", &onemin, &fivemin, &fifteenmin) == 3){
capa["load"]["one"] = (long long int)(onemin * 100);
capa["load"]["five"] = (long long int)(onemin * 100);
capa["load"]["fifteen"] = (long long int)(onemin * 100);
}
}
}
}

View file

@ -0,0 +1,5 @@
#include <mist/json.h>
namespace Controller {
void checkCapable(JSON::Value & capa);
}

View file

@ -2,11 +2,35 @@
#include <mist/config.h> #include <mist/config.h>
#include <mist/procs.h> #include <mist/procs.h>
#include "controller_storage.h" #include "controller_storage.h"
#include "controller_connectors.h"
namespace Controller { namespace Controller {
void CheckProtocols(JSON::Value & p){
static std::map<std::string, std::string> current_connectors; static std::map<std::string, std::string> current_connectors;
/// Checks if the binary mentioned in the protocol argument is currently active, if so, restarts it.
void UpdateProtocol(std::string protocol){
std::map<std::string, std::string>::iterator iter;
for (iter = current_connectors.begin(); iter != current_connectors.end(); iter++){
if (iter->second.substr(0, protocol.size()) == protocol){
Log("CONF", "Restarting connector for update: " + iter->second);
Util::Procs::Stop(iter->first);
int i = 0;
while (Util::Procs::isActive(iter->first) && i < 30){
Util::sleep(100);
}
if (i >= 30){
Log("WARN", "Connector still active 3 seconds after shutdown - delaying restart.");
}else{
Util::Procs::Start(iter->first, Util::getMyPath() + iter->second);
}
return;
}
}
}
/// Checks current protocol configuration, updates state of enabled connectors if neccesary.
void CheckProtocols(JSON::Value & p){
std::map<std::string, std::string> new_connectors; std::map<std::string, std::string> new_connectors;
std::map<std::string, std::string>::iterator iter; std::map<std::string, std::string>::iterator iter;
bool haveHTTPgeneric = false; bool haveHTTPgeneric = false;
@ -16,11 +40,17 @@ namespace Controller{
JSON::Value counter = (long long int)0; JSON::Value counter = (long long int)0;
for (JSON::ArrIter ait = p.ArrBegin(); ait != p.ArrEnd(); ait++){ for (JSON::ArrIter ait = p.ArrBegin(); ait != p.ArrEnd(); ait++){
if (!(*ait).isMember("connector") || (*ait)["connector"].asString() == ""){continue;} if ( !( *ait).isMember("connector") || ( *ait)["connector"].asString() == ""){
continue;
}
tmp = std::string("MistConn") + ( *ait)["connector"].asString() + std::string(" -n"); tmp = std::string("MistConn") + ( *ait)["connector"].asString() + std::string(" -n");
if ((*ait)["connector"].asString() == "HTTP"){haveHTTPgeneric = true;} if (( *ait)["connector"].asString() == "HTTP"){
if ((*ait)["connector"].asString() != "HTTP" && (*ait)["connector"].asString().substr(0, 4) == "HTTP"){haveHTTPspecific = true;} haveHTTPgeneric = true;
}
if (( *ait)["connector"].asString() != "HTTP" && ( *ait)["connector"].asString().substr(0, 4) == "HTTP"){
haveHTTPspecific = true;
}
if (( *ait).isMember("port") && ( *ait)["port"].asInt() != 0){ if (( *ait).isMember("port") && ( *ait)["port"].asInt() != 0){
tmp += std::string(" -p ") + ( *ait)["port"].asString(); tmp += std::string(" -p ") + ( *ait)["port"].asString();
@ -38,7 +68,6 @@ namespace Controller{
tmp += std::string(" ") + ( *ait)["args"].asString(); tmp += std::string(" ") + ( *ait)["args"].asString();
} }
counter = counter.asInt() + 1; counter = counter.asInt() + 1;
new_connectors[std::string("Conn") + counter.asString()] = tmp; new_connectors[std::string("Conn") + counter.asString()] = tmp;
if (Util::Procs::isActive(std::string("Conn") + counter.asString())){ if (Util::Procs::isActive(std::string("Conn") + counter.asString())){
@ -75,5 +104,4 @@ namespace Controller{
current_connectors = new_connectors; current_connectors = new_connectors;
} }
} }

View file

@ -1,5 +1,11 @@
#include <mist/json.h>
namespace Controller { namespace Controller {
/// Checks if the binary mentioned in the protocol argument is currently active, if so, restarts it.
void UpdateProtocol(std::string protocol);
/// Checks current protocol configuration, updates state of enabled connectors if neccesary.
void CheckProtocols(JSON::Value & p); void CheckProtocols(JSON::Value & p);
} }

View file

@ -1,4 +1,5 @@
#include <iostream> #include <iostream>
#include <fstream>
#include <mist/timing.h> #include <mist/timing.h>
#include "controller_storage.h" #include "controller_storage.h"
@ -11,7 +12,9 @@ namespace Controller{
//if last log message equals this one, do not log. //if last log message equals this one, do not log.
if (Storage["log"].size() > 0){ if (Storage["log"].size() > 0){
JSON::ArrIter it = Storage["log"].ArrEnd() - 1; JSON::ArrIter it = Storage["log"].ArrEnd() - 1;
if ((*it)[2] == message){return;} if (( *it)[2] == message){
return;
}
} }
JSON::Value m; JSON::Value m;
m.append(Util::epoch()); m.append(Util::epoch());
@ -22,4 +25,12 @@ namespace Controller{
std::cout << "[" << kind << "] " << message << std::endl; std::cout << "[" << kind << "] " << message << std::endl;
} }
/// Write contents to Filename
void WriteFile(std::string Filename, std::string contents){
std::ofstream File;
File.open(Filename.c_str());
File << contents << std::endl;
File.close();
}
} }

View file

@ -8,4 +8,7 @@ namespace Controller{
/// Store and print a log message. /// Store and print a log message.
void Log(std::string kind, std::string message); void Log(std::string kind, std::string message);
/// Write contents to Filename.
void WriteFile(std::string Filename, std::string contents);
} }

111
src/controller_streams.cpp Normal file
View file

@ -0,0 +1,111 @@
#include <mist/procs.h>
#include <mist/config.h>
#include <mist/timing.h>
#include "controller_streams.h"
#include "controller_storage.h"
#include <sys/stat.h>
namespace Controller {
std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers.
bool streamsEqual(JSON::Value & one, JSON::Value & two){
if (one["channel"]["URL"] != two["channel"]["URL"]){
return false;
}
if (one["preset"]["cmd"] != two["preset"]["cmd"]){
return false;
}
return true;
}
void startStream(std::string name, JSON::Value & data){
std::string URL = data["channel"]["URL"];
std::string preset = data["preset"]["cmd"];
std::string cmd1, cmd2, cmd3;
if (URL.substr(0, 4) == "push"){
std::string pusher = URL.substr(7);
cmd2 = "MistBuffer -s " + name + " " + pusher;
Util::Procs::Start(name, Util::getMyPath() + cmd2);
Log("BUFF", "(re)starting stream buffer " + name + " for push data from " + pusher);
}else{
if (URL.substr(0, 1) == "/"){
struct stat fileinfo;
if (stat(URL.c_str(), &fileinfo) != 0 || S_ISDIR(fileinfo.st_mode)){
Log("BUFF", "Warning for VoD stream " + name + "! File not found: " + URL);
data["error"] = "Not found: " + URL;
return;
}
cmd1 = "cat " + URL;
data["error"] = "Available";
return; //MistPlayer handles VoD
}else{
cmd1 = "ffmpeg -re -async 2 -i " + URL + " " + preset + " -f flv -";
cmd2 = "MistFLV2DTSC";
}
cmd3 = "MistBuffer -s " + name;
if (cmd2 != ""){
Util::Procs::Start(name, cmd1, Util::getMyPath() + cmd2, Util::getMyPath() + cmd3);
Log("BUFF", "(re)starting stream buffer " + name + " for ffmpeg data: " + cmd1);
}else{
Util::Procs::Start(name, cmd1, Util::getMyPath() + cmd3);
Log("BUFF", "(re)starting stream buffer " + name + " using input file " + URL);
}
}
}
void CheckAllStreams(JSON::Value & data){
long long int currTime = Util::epoch();
for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){
if ( !Util::Procs::isActive(jit->first)){
startStream(jit->first, jit->second);
}
if (currTime - lastBuffer[jit->first] > 5){
if (jit->second.isMember("error") && jit->second["error"].asString() != ""){
jit->second["online"] = jit->second["error"];
}else{
jit->second["online"] = 0;
}
}else{
jit->second["online"] = 1;
}
}
static JSON::Value strlist;
bool changed = false;
if (strlist["config"] != Storage["config"]){
strlist["config"] = Storage["config"];
changed = true;
}
if (strlist["streams"] != Storage["streams"]){
strlist["streams"] = Storage["streams"];
changed = true;
}
if (changed){
WriteFile("/tmp/mist/streamlist", strlist.toString());
}
}
void CheckStreams(JSON::Value & in, JSON::Value & out){
bool changed = false;
for (JSON::ObjIter jit = in.ObjBegin(); jit != in.ObjEnd(); jit++){
if (out.isMember(jit->first)){
if ( !streamsEqual(jit->second, out[jit->first])){
Log("STRM", std::string("Updated stream ") + jit->first);
Util::Procs::Stop(jit->first);
startStream(jit->first, jit->second);
}
}else{
Log("STRM", std::string("New stream ") + jit->first);
startStream(jit->first, jit->second);
}
}
for (JSON::ObjIter jit = out.ObjBegin(); jit != out.ObjEnd(); jit++){
if ( !in.isMember(jit->first)){
Log("STRM", std::string("Deleted stream ") + jit->first);
Util::Procs::Stop(jit->first);
}
}
out = in;
}
} //Controller namespace

10
src/controller_streams.h Normal file
View file

@ -0,0 +1,10 @@
#include <mist/json.h>
namespace Controller {
extern std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers.
bool streamsEqual(JSON::Value & one, JSON::Value & two);
void startStream(std::string name, JSON::Value & data);
void CheckAllStreams(JSON::Value & data);
void CheckStreams(JSON::Value & in, JSON::Value & out);
} //Controller namespace

View file

@ -58,7 +58,7 @@ namespace Converters{
return 0; return 0;
} //FLV2DTSC } //FLV2DTSC
};//Converter namespace } //Converter namespace
/// Entry point for DTSC2FLV, simply calls Converters::DTSC2FLV(). /// Entry point for DTSC2FLV, simply calls Converters::DTSC2FLV().
int main(int argc, char ** argv){ int main(int argc, char ** argv){

View file

@ -48,12 +48,18 @@ namespace Converters{
F.seekNext(); F.seekNext();
while ( !F.getJSON().isNull()){ while ( !F.getJSON().isNull()){
nowpack = F.getJSON()["time"].asInt(); nowpack = F.getJSON()["time"].asInt();
if (firstpack == 0){firstpack = nowpack;} if (firstpack == 0){
firstpack = nowpack;
}
if (F.getJSON()["datatype"].asString() == "audio"){ if (F.getJSON()["datatype"].asString() == "audio"){
if (lastaudio != 0 && (nowpack - lastaudio) != 0){ if (lastaudio != 0 && (nowpack - lastaudio) != 0){
bps = F.getJSON()["data"].asString().size() / (nowpack - lastaudio); bps = F.getJSON()["data"].asString().size() / (nowpack - lastaudio);
if (bps < aud_min){aud_min = bps;} if (bps < aud_min){
if (bps > aud_max){aud_max = bps;} aud_min = bps;
}
if (bps > aud_max){
aud_max = bps;
}
} }
totalaudio += F.getJSON()["data"].asString().size(); totalaudio += F.getJSON()["data"].asString().size();
lastaudio = nowpack; lastaudio = nowpack;
@ -61,24 +67,36 @@ namespace Converters{
if (F.getJSON()["datatype"].asString() == "video"){ if (F.getJSON()["datatype"].asString() == "video"){
if (lastvideo != 0 && (nowpack - lastvideo) != 0){ if (lastvideo != 0 && (nowpack - lastvideo) != 0){
bps = F.getJSON()["data"].asString().size() / (nowpack - lastvideo); bps = F.getJSON()["data"].asString().size() / (nowpack - lastvideo);
if (bps < vid_min){vid_min = bps;} if (bps < vid_min){
if (bps > vid_max){vid_max = bps;} vid_min = bps;
}
if (bps > vid_max){
vid_max = bps;
}
} }
if (F.getJSON()["keyframe"].asInt() != 0){ if (F.getJSON()["keyframe"].asInt() != 0){
meta["keytime"].append(F.getJSON()["time"]); meta["keytime"].append(F.getJSON()["time"]);
meta["keybpos"].append(F.getLastReadPos()); meta["keybpos"].append(F.getLastReadPos());
if (lastkey != 0){ if (lastkey != 0){
bps = nowpack - lastkey; bps = nowpack - lastkey;
if (bps < key_min){key_min = bps;} if (bps < key_min){
if (bps > key_max){key_max = bps;} key_min = bps;
}
if (bps > key_max){
key_max = bps;
}
} }
keyframes++; keyframes++;
lastkey = nowpack; lastkey = nowpack;
} }
if (F.getJSON()["offset"].asInt() != 0){ if (F.getJSON()["offset"].asInt() != 0){
bps = F.getJSON()["offset"].asInt(); bps = F.getJSON()["offset"].asInt();
if (bps < bfrm_min){bfrm_min = bps;} if (bps < bfrm_min){
if (bps > bfrm_max){bfrm_max = bps;} bfrm_min = bps;
}
if (bps > bfrm_max){
bfrm_max = bps;
}
} }
totalvideo += F.getJSON()["data"].asString().size(); totalvideo += F.getJSON()["data"].asString().size();
lastvideo = nowpack; lastvideo = nowpack;
@ -119,7 +137,7 @@ namespace Converters{
} }
} //DTSCFix } //DTSCFix
}; }
/// Entry point for FLV2DTSC, simply calls Converters::FLV2DTSC(). /// Entry point for FLV2DTSC, simply calls Converters::FLV2DTSC().
int main(int argc, char ** argv){ int main(int argc, char ** argv){

View file

@ -31,7 +31,9 @@ namespace Converters{
while ( !feof(stdin)){ while ( !feof(stdin)){
if (FLV_in.FileLoader(stdin)){ if (FLV_in.FileLoader(stdin)){
pack_out = FLV_in.toJSON(meta_out); pack_out = FLV_in.toJSON(meta_out);
if (pack_out.isNull()){continue;} if (pack_out.isNull()){
continue;
}
if ( !sending){ if ( !sending){
counter++; counter++;
if (counter > 8){ if (counter > 8){
@ -71,7 +73,7 @@ namespace Converters{
return 0; return 0;
} //FLV2DTSC } //FLV2DTSC
};//Buffer namespace }
/// Entry point for FLV2DTSC, simply calls Converters::FLV2DTSC(). /// Entry point for FLV2DTSC, simply calls Converters::FLV2DTSC().
int main(int argc, char ** argv){ int main(int argc, char ** argv){

View file

@ -32,8 +32,11 @@ class Stats{
std::string connector; std::string connector;
unsigned int conntime; unsigned int conntime;
Stats(){ Stats(){
up = 0; down = 0; conntime = 0; up = 0;
}; down = 0;
conntime = 0;
}
;
/// Reads a stats string and parses it to the internal representation. /// Reads a stats string and parses it to the internal representation.
Stats(std::string s){ Stats(std::string s){
size_t f = s.find(' '); size_t f = s.find(' ');
@ -57,7 +60,7 @@ class Stats{
s.erase(0, f + 1); s.erase(0, f + 1);
down = atoi(s.c_str()); down = atoi(s.c_str());
} }
}; }
}; };
int main(int argc, char** argv){ int main(int argc, char** argv){
@ -109,7 +112,8 @@ int main(int argc, char** argv){
std::cerr << "Received push - ignoring (" << in_out.Received().get() << ")" << std::endl; std::cerr << "Received push - ignoring (" << in_out.Received().get() << ")" << std::endl;
#endif #endif
in_out.close(); //pushing to VoD makes no sense in_out.close(); //pushing to VoD makes no sense
} break; }
break;
case 'S': { //Stats case 'S': { //Stats
if ( !StatsSocket.connected()){ if ( !StatsSocket.connected()){
StatsSocket = Socket::Connection("/tmp/mist/statistics", true); StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
@ -137,34 +141,42 @@ int main(int argc, char** argv){
StatsSocket.Send("\n\n"); StatsSocket.Send("\n\n");
StatsSocket.flush(); StatsSocket.flush();
} }
} break; }
break;
case 's': { //second-seek case 's': { //second-seek
int ms = JSON::Value(in_out.Received().get().substr(2)).asInt(); int ms = JSON::Value(in_out.Received().get().substr(2)).asInt();
bool ret = source.seek_time(ms); bool ret = source.seek_time(ms);
lastTime = 0; lastTime = 0;
} break; }
break;
case 'f': { //frame-seek case 'f': { //frame-seek
bool ret = source.seek_frame(JSON::Value(in_out.Received().get().substr(2)).asInt()); bool ret = source.seek_frame(JSON::Value(in_out.Received().get().substr(2)).asInt());
lastTime = 0; lastTime = 0;
} break; }
break;
case 'p': { //play case 'p': { //play
playing = -1; playing = -1;
lastTime = 0; lastTime = 0;
in_out.setBlocking(false); in_out.setBlocking(false);
} break; }
break;
case 'o': { //once-play case 'o': { //once-play
if (playing <= 0){playing = 1;} if (playing <= 0){
playing = 1;
}
++playing; ++playing;
in_out.setBlocking(false); in_out.setBlocking(false);
#if DEBUG >= 4 #if DEBUG >= 4
std::cerr << "Playing one keyframe" << std::endl; std::cerr << "Playing one keyframe" << std::endl;
#endif #endif
bench = Util::getMS(); bench = Util::getMS();
} break; }
break;
case 'q': { //quit-playing case 'q': { //quit-playing
playing = 0; playing = 0;
in_out.setBlocking(true); in_out.setBlocking(true);
} break; }
break;
} }
in_out.Received().get().clear(); in_out.Received().get().clear();
} }
@ -173,13 +185,17 @@ int main(int argc, char** argv){
if (playing != 0){ if (playing != 0){
now = Util::getMS(); now = Util::getMS();
source.seekNext(); source.seekNext();
if (!source.getJSON()){playing = 0;} if ( !source.getJSON()){
playing = 0;
}
if (source.getJSON().isMember("keyframe")){ if (source.getJSON().isMember("keyframe")){
if (playing == -1 && meta["video"]["keyms"].asInt() > now - lastTime){ if (playing == -1 && meta["video"]["keyms"].asInt() > now - lastTime){
Util::sleep(meta["video"]["keyms"].asInt() - (now - lastTime)); Util::sleep(meta["video"]["keyms"].asInt() - (now - lastTime));
} }
lastTime = now; lastTime = now;
if (playing > 0){--playing;} if (playing > 0){
--playing;
}
} }
if (playing == 0){ if (playing == 0){
#if DEBUG >= 4 #if DEBUG >= 4