Converted HTTP based outputs to new and improved mechanism, increasing robustness and efficiency.

This commit is contained in:
Thulinma 2014-10-31 00:38:25 +01:00
parent b325ca96ee
commit d457046420
32 changed files with 1113 additions and 1510 deletions

View file

@ -23,7 +23,7 @@ LDLIBS = -lmist -lrt
.DEFAULT_GOAL := all
all: MistConnHTTP controller analysers inputs outputs
all: controller analysers inputs outputs
DOXYGEN := $(shell doxygen -v 2> /dev/null)
ifdef DOXYGEN
@ -37,11 +37,6 @@ MistController: override LDLIBS += $(THREADLIB)
MistController: src/controller/server.html.h src/controller/*
$(CXX) $(LDFLAGS) $(CPPFLAGS) src/controller/*.cpp $(LDLIBS) -o $@
connectors: MistConnHTTP
MistConnHTTP: override LDLIBS += $(THREADLIB)
MistConnHTTP: src/connectors/conn_http.cpp src/connectors/embed.js.h src/connectors/icon.h
$(CXX) $(LDFLAGS) $(CPPFLAGS) $< $(LDLIBS) -o $@
analysers: MistAnalyserRTMP
MistAnalyserRTMP: src/analysers/rtmp_analyser.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
@ -70,34 +65,6 @@ analysers: MistInfo
MistInfo: src/analysers/info.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
converters: MistDTSC2FLV
MistDTSC2FLV: src/converters/dtsc2flv.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
converters: MistFLV2DTSC
MistFLV2DTSC: src/converters/flv2dtsc.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
converters: MistDTSCFix
MistDTSCFix: src/converters/dtscfix.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
converters: MistDTSCMerge
MistDTSCMerge: src/converters/dtscmerge.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
converters: MistDTSC2TS
MistDTSC2TS: src/converters/dtsc2ts.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
converters: MistSRT2DTSC
MistSRT2DTSC: src/converters/srt2dtsc.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
converters: MistDTSC2SRT
MistDTSC2SRT: src/converters/dtsc2srt.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
inputs: MistInDTSC
MistInDTSC: override LDLIBS += $(THREADLIB)
MistInDTSC: override CPPFLAGS += "-DINPUTTYPE=\"input_dtsc.h\""
@ -125,19 +92,19 @@ MistInBuffer: src/input/mist_in.cpp src/input/input.cpp src/input/input_buffer.c
outputs: MistOutFLV
MistOutFLV: override LDLIBS += $(THREADLIB)
MistOutFLV: override CPPFLAGS += "-DOUTPUTTYPE=\"output_progressive_flv.h\""
MistOutFLV: src/output/mist_out_http.cpp src/output/output.cpp src/output/output_progressive_flv.cpp
MistOutFLV: src/output/mist_out.cpp src/output/output_http.cpp src/output/output.cpp src/output/output_progressive_flv.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutMP4
MistOutMP4: override LDLIBS += $(THREADLIB)
MistOutMP4: override CPPFLAGS += "-DOUTPUTTYPE=\"output_progressive_mp4.h\""
MistOutMP4: src/output/mist_out_http.cpp src/output/output.cpp src/output/output_progressive_mp4.cpp
MistOutMP4: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_progressive_mp4.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutMP3
MistOutMP3: override LDLIBS += $(THREADLIB)
MistOutMP3: override CPPFLAGS += "-DOUTPUTTYPE=\"output_progressive_mp3.h\""
MistOutMP3: src/output/mist_out_http.cpp src/output/output.cpp src/output/output_progressive_mp3.cpp
MistOutMP3: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_progressive_mp3.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutRTMP
@ -155,7 +122,7 @@ MistOutRaw: src/output/mist_out.cpp src/output/output.cpp src/output/output_raw.
outputs: MistOutHTTPTS
MistOutHTTPTS: override LDLIBS += $(THREADLIB)
MistOutHTTPTS: override CPPFLAGS += "-DOUTPUTTYPE=\"output_httpts.h\""
MistOutHTTPTS: src/output/mist_out_http.cpp src/output/output.cpp src/output/output_httpts.cpp
MistOutHTTPTS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_httpts.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutTS
@ -164,37 +131,42 @@ MistOutTS: override CPPFLAGS += "-DOUTPUTTYPE=\"output_ts.h\""
MistOutTS: src/output/mist_out.cpp src/output/output.cpp src/output/output_ts.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutHTTP
MistOutHTTP: override LDLIBS += $(THREADLIB)
MistOutHTTP: override CPPFLAGS += "-DOUTPUTTYPE=\"output_http_internal.h\""
MistOutHTTP: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_http_internal.cpp src/embed.js.h
$(CXX) $(LDFLAGS) $(CPPFLAGS) src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_http_internal.cpp $(LDLIBS) -o $@
outputs: MistOutHSS
MistOutHSS: override LDLIBS += $(THREADLIB)
MistOutHSS: override CPPFLAGS += "-DOUTPUTTYPE=\"output_hss.h\""
MistOutHSS: src/output/mist_out_http.cpp src/output/output.cpp src/output/output_hss.cpp
MistOutHSS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_hss.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutHLS
MistOutHLS: override LDLIBS += $(THREADLIB)
MistOutHLS: override CPPFLAGS += "-DOUTPUTTYPE=\"output_hls.h\""
MistOutHLS: src/output/mist_out_http.cpp src/output/output.cpp src/output/output_hls.cpp
MistOutHLS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_hls.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutHDS
MistOutHDS: override LDLIBS += $(THREADLIB)
MistOutHDS: override CPPFLAGS += "-DOUTPUTTYPE=\"output_hds.h\""
MistOutHDS: src/output/mist_out_http.cpp src/output/output.cpp src/output/output_hds.cpp
MistOutHDS: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_hds.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutSRT
MistOutSRT: override LDLIBS += $(THREADLIB)
MistOutSRT: override CPPFLAGS += "-DOUTPUTTYPE=\"output_srt.h\""
MistOutSRT: src/output/mist_out_http.cpp src/output/output.cpp src/output/output_srt.cpp
MistOutSRT: src/output/mist_out.cpp src/output/output_http.cpp src/output/output.cpp src/output/output_srt.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
outputs: MistOutJSON
MistOutJSON: override LDLIBS += $(THREADLIB)
MistOutJSON: override CPPFLAGS += "-DOUTPUTTYPE=\"output_json.h\""
MistOutJSON: src/output/mist_out_http.cpp src/output/output.cpp src/output/output_json.cpp
MistOutJSON: src/output/mist_out.cpp src/output/output.cpp src/output/output_http.cpp src/output/output_json.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
BUILT_SOURCES=controller/server.html.h connectors/embed.js.h
lspSOURCES=lsp/plugins/jquery.js lsp/plugins/placeholder.js lsp/plugins/md5.js lsp/main.js lsp/pages.js lsp/plugins/tablesort.js lsp/plugins/jquery.flot.min.js lsp/plugins/jquery.flot.time.min.js lsp/plugins/jquery.flot.crosshair.min.js
lspDATA=lsp/header.html lsp/main.css lsp/footer.html
@ -209,9 +181,9 @@ endif
sourcery: src/sourcery.cpp
$(CXX) -o $@ $(CPPFLAGS) $^
src/connectors/embed.js.h: src/connectors/embed.js sourcery
$(CLOSURE) src/connectors/embed.js > embed.min.js
./sourcery embed.min.js embed_js > src/connectors/embed.js.h
src/embed.js.h: src/embed.js sourcery
$(CLOSURE) src/embed.js > embed.min.js
./sourcery embed.min.js embed_js > src/embed.js.h
rm embed.min.js
src/controller/server.html: $(lspDATA) $(lspSOURCES)

View file

@ -1,744 +0,0 @@
/// \file conn_http.cpp
/// Contains the main code for the HTTP Connector
#include <iostream>
#include <queue>
#include <set>
#include <sstream>
#include <ctime>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h> //
#include <getopt.h>
#include <mist/socket.h>
#include <mist/http_parser.h>
#include <mist/config.h>
#include <mist/stream.h>
#include <mist/timing.h>
#include <mist/auth.h>
#include <mist/procs.h>
#include <mist/tinythread.h>
#include <mist/defines.h>
#include <mist/dtsc.h>
#include <mist/shared_memory.h>
#include "embed.js.h"
/// Holds everything unique to HTTP Connectors.
namespace Connector_HTTP {
static inline void builPipedPart(JSON::Value & p, char * argarr[], int & argnum, JSON::Value & argset){
for (JSON::ObjIter it = argset.ObjBegin(); it != argset.ObjEnd(); ++it){
if (it->second.isMember("option") && p.isMember(it->first)){
if (it->second.isMember("type")){
if (it->second["type"].asStringRef() == "str" && !p[it->first].isString()){
p[it->first] = p[it->first].asString();
}
if ((it->second["type"].asStringRef() == "uint" || it->second["type"].asStringRef() == "int") && !p[it->first].isInt()){
p[it->first] = JSON::Value(p[it->first].asInt()).asString();
}
}
if (p[it->first].asStringRef().size() > 0){
argarr[argnum++] = (char*)(it->second["option"].c_str());
argarr[argnum++] = (char*)(p[it->first].c_str());
}
}
}
}
/// Class for keeping track of connections to connectors.
class ConnConn{
public:
Socket::Connection * conn; ///< The socket of this connection
unsigned int lastUse; ///< Seconds since last use of this connection.
tthread::mutex inUse; ///< Mutex for this connection.
/// Constructor that sets the socket and lastUse to 0.
ConnConn(){
conn = 0;
lastUse = 0;
}
/// Constructor that sets lastUse to 0, but socket to s.
ConnConn(Socket::Connection * s){
conn = s;
lastUse = 0;
}
/// Destructor that deletes the socket if non-null.
~ConnConn(){
if (conn){
conn->close();
delete conn;
}
conn = 0;
}
};
std::map<std::string, ConnConn *> connectorConnections; ///< Connections to connectors
tthread::mutex connMutex; ///< Mutex for adding/removing connector connections.
bool timeoutThreadStarted = false;
tthread::mutex timeoutStartMutex; ///< Mutex for starting timeout thread.
tthread::mutex timeoutMutex; ///< Mutex for timeout thread.
tthread::thread * timeouter = 0; ///< Thread that times out connections to connectors.
IPC::sharedPage serverCfg; ///< Contains server configuration and capabilities
///\brief Function run as a thread to timeout requests on the proxy.
///\param n A NULL-pointer
void proxyTimeoutThread(void * n){
n = 0; //prevent unused variable warning
tthread::lock_guard<tthread::mutex> guard(timeoutMutex);
timeoutThreadStarted = true;
while (true){
{
tthread::lock_guard<tthread::mutex> guard(connMutex);
if (connectorConnections.empty()){
return;
}
std::map<std::string, ConnConn*>::iterator it;
for (it = connectorConnections.begin(); it != connectorConnections.end(); it++){
ConnConn* ccPointer = it->second;
if ( !ccPointer->conn->connected() || ccPointer->lastUse++ > 15){
if (ccPointer->inUse.try_lock()){
connectorConnections.erase(it);
ccPointer->inUse.unlock();
delete ccPointer;
it = connectorConnections.begin(); //get a valid iterator
if (it == connectorConnections.end()){
return;
}
}
}
}
}
usleep(1000000); //sleep 1 second and re-check
}
}
///\brief Handles requests without associated handler.
///
///Displays a friendly error message.
///\param H The request to be handled.
///\param conn The connection to the client that issued the request.
///\return A timestamp indicating when the request was parsed.
long long int proxyHandleUnsupported(HTTP::Parser & H, Socket::Connection & conn){
H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody(
"<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>");
long long int ret = Util::getMS();
conn.SendNow(H.BuildResponse("415", "Unsupported Media Type"));
return ret;
}
///\brief Handles requests that have timed out.
///
///Displays a friendly error message.
///\param H The request that was being handled upon timeout.
///\param conn The connection to the client that issued the request.
///\param msg The message to print to the client.
///\return A timestamp indicating when the request was parsed.
long long int proxyHandleTimeout(HTTP::Parser & H, Socket::Connection & conn, std::string msg){
H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody(
"<!DOCTYPE html><html><head><title>"+msg+"</title></head><body><h1>"+msg+"</h1>Though the server understood your request and attempted to handle it, somehow handling it took longer than it should. Your request has been cancelled - please try again later.</body></html>");
long long int ret = Util::getMS();
conn.SendNow(H.BuildResponse("504", msg));
return ret;
}
/// Sorts the JSON::Value objects that hold source information by preference.
struct sourceCompare {
bool operator() (const JSON::Value& lhs, const JSON::Value& rhs) const {
//first compare simultaneous tracks
if (lhs["simul_tracks"].asInt() > rhs["simul_tracks"].asInt()){
//more tracks = higher priority = true.
return true;
}
if (lhs["simul_tracks"].asInt() < rhs["simul_tracks"].asInt()){
//less tracks = lower priority = false
return false;
}
//same amount of tracks - compare "hardcoded" priorities
if (lhs["priority"].asInt() > rhs["priority"].asInt()){
//higher priority = true.
return true;
}
if (lhs["priority"].asInt() < rhs["priority"].asInt()){
//lower priority = false
return false;
}
//same priority - compare total matches
if (lhs["total_matches"].asInt() > rhs["total_matches"].asInt()){
//more matches = higher priority = true.
return true;
}
if (lhs["total_matches"].asInt() < rhs["total_matches"].asInt()){
//less matches = lower priority = false
return false;
}
//also same amount of matches? just compare the URL then.
return lhs["url"].asStringRef() < rhs["url"].asStringRef();
}
};
void addSource(const std::string & rel, std::set<JSON::Value, sourceCompare> & sources, std::string & host, const std::string & port, JSON::Value & conncapa, unsigned int most_simul, unsigned int total_matches){
JSON::Value tmp;
tmp["type"] = conncapa["type"];
tmp["relurl"] = rel;
tmp["priority"] = conncapa["priority"];
tmp["simul_tracks"] = most_simul;
tmp["total_matches"] = total_matches;
tmp["url"] = conncapa["handler"].asStringRef() + "://" + host + ":" + port + rel;
sources.insert(tmp);
}
void addSources(std::string & streamname, const std::string & rel, std::set<JSON::Value, sourceCompare> & sources, std::string & host, const std::string & port, JSON::Value & conncapa, JSON::Value & strmMeta){
unsigned int most_simul = 0;
unsigned int total_matches = 0;
if (conncapa.isMember("codecs") && conncapa["codecs"].size() > 0){
for (JSON::ArrIter it = conncapa["codecs"].ArrBegin(); it != conncapa["codecs"].ArrEnd(); it++){
unsigned int simul = 0;
if ((*it).size() > 0){
for (JSON::ArrIter itb = (*it).ArrBegin(); itb != (*it).ArrEnd(); itb++){
unsigned int matches = 0;
if ((*itb).size() > 0){
for (JSON::ArrIter itc = (*itb).ArrBegin(); itc != (*itb).ArrEnd(); itc++){
for (JSON::ObjIter trit = strmMeta["tracks"].ObjBegin(); trit != strmMeta["tracks"].ObjEnd(); trit++){
if (trit->second["codec"].asStringRef() == (*itc).asStringRef()){
matches++;
total_matches++;
}
}
}
}
if (matches){
simul++;
}
}
}
if (simul > most_simul){
most_simul = simul;
}
}
}
if (conncapa.isMember("methods") && conncapa["methods"].size() > 0){
std::string relurl;
size_t found = rel.find('$');
if (found != std::string::npos){
relurl = rel.substr(0, found) + streamname + rel.substr(found+1);
}else{
relurl = "/";
}
for (JSON::ArrIter it = conncapa["methods"].ArrBegin(); it != conncapa["methods"].ArrEnd(); it++){
if (!strmMeta.isMember("live") || !it->isMember("nolive")){
addSource(relurl, sources, host, port, *it, most_simul, total_matches);
}
}
}
}
///\brief Handles requests within the proxy.
///
///Currently supported urls:
/// - /crossdomain.xml
/// - /clientaccesspolicy.xml
/// - *.ico (for favicon)
/// - /info_[streamname].js (stream info)
/// - /embed_[streamname].js (embed info)
///
///Unsupported urls default to proxyHandleUnsupported( ).
///\param H The request to be handled.
///\param conn The connection to the client that issued the request.
///\return A timestamp indicating when the request was parsed.
long long int proxyHandleInternal(HTTP::Parser & H, Socket::Connection & conn){
std::string url = H.getUrl();
if (url == "/crossdomain.xml"){
H.Clean();
H.SetHeader("Content-Type", "text/xml");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody(
"<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><allow-access-from domain=\"*\" /><site-control permitted-cross-domain-policies=\"all\"/></cross-domain-policy>");
long long int ret = Util::getMS();
conn.SendNow(H.BuildResponse("200", "OK"));
return ret;
} //crossdomain.xml
if (url == "/clientaccesspolicy.xml"){
H.Clean();
H.SetHeader("Content-Type", "text/xml");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody(
"<?xml version=\"1.0\" encoding=\"utf-8\"?><access-policy><cross-domain-access><policy><allow-from http-methods=\"*\" http-request-headers=\"*\"><domain uri=\"*\"/></allow-from><grant-to><resource path=\"/\" include-subpaths=\"true\"/></grant-to></policy></cross-domain-access></access-policy>");
long long int ret = Util::getMS();
conn.SendNow(H.BuildResponse("200", "OK"));
return ret;
} //clientaccesspolicy.xml
// send logo icon
if (url.length() > 4 && url.substr(url.length() - 4, 4) == ".ico"){
H.Clean();
#include "icon.h"
H.SetHeader("Content-Type", "image/x-icon");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetHeader("Content-Length", icon_len);
long long int ret = Util::getMS();
H.SendResponse("200", "OK", conn);
conn.SendNow((const char*)icon_data, icon_len);
return ret;
}
// send logo icon
if (url.length() > 6 && url.substr(url.length() - 5, 5) == ".html"){
std::string streamname = url.substr(1, url.length() - 6);
Util::sanitizeName(streamname);
H.Clean();
H.SetHeader("Content-Type", "text/html");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody("<!DOCTYPE html><html><head><title>Stream "+streamname+"</title><style>BODY{color:white;background:black;}</style></head><body><script src=\"embed_"+streamname+".js\"></script></body></html>");
long long int ret = Util::getMS();
H.SendResponse("200", "OK", conn);
return ret;
}
// send smil MBR index
if (url.length() > 6 && url.substr(url.length() - 5, 5) == ".smil"){
std::string streamname = url.substr(1, url.length() - 6);
Util::sanitizeName(streamname);
std::string host = H.GetHeader("Host");
if (host.find(':')){
host.resize(host.find(':'));
}
std::string port, url_rel;
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan prtcls = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember("RTMP");
unsigned int pro_cnt = prtcls.getSize();
for (unsigned int i = 0; i < pro_cnt; ++i){
if (prtcls.getIndice(i).getMember("connector").asString() != "RTMP"){
continue;
}
port = prtcls.getIndice(i).getMember("port").asString();
//get the default port if none is set
if (!port.size()){
port = capa.getMember("optional").getMember("port").getMember("default").asString();
}
//extract url
url_rel = capa.getMember("url_rel").asString();
if (url_rel.find('$')){
url_rel.resize(url_rel.find('$'));
}
}
std::string trackSources;//this string contains all track sources for MBR smil
DTSC::Scan tracks = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(streamname).getMember("meta").getMember("tracks");
unsigned int track_ctr = tracks.getSize();
for (unsigned int i = 0; i < track_ctr; ++i){//for all video tracks
DTSC::Scan trk = tracks.getIndice(i);
if (trk.getMember("type").asString() == "video"){
trackSources += " <video src='"+ streamname + "?track=" + trk.getMember("trackid").asString() + "' height='" + trk.getMember("height").asString() + "' system-bitrate='" + trk.getMember("bps").asString() + "' width='" + trk.getMember("width").asString() + "' />\n";
}
}
configLock.post();
configLock.close();
H.Clean();
H.SetHeader("Content-Type", "application/smil");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody("<smil>\n <head>\n <meta base='rtmp://" + host + ":" + port + url_rel + "' />\n </head>\n <body>\n <switch>\n"+trackSources+" </switch>\n </body>\n</smil>");
long long int ret = Util::getMS();
H.SendResponse("200", "OK", conn);
return ret;
}
if ((url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js")
|| (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js")){
std::string streamname;
if (url.substr(0, 6) == "/info_"){
streamname = url.substr(6, url.length() - 9);
}else{
streamname = url.substr(7, url.length() - 10);
}
Util::sanitizeName(streamname);
std::string response;
std::string host = H.GetHeader("Host");
if (host.find(':') != std::string::npos){
host.resize(host.find(':'));
}
H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetHeader("Content-Type", "application/javascript");
response = "// Generating info code for stream " + streamname + "\n\nif (!mistvideo){var mistvideo = {};}\n";
JSON::Value json_resp;
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
IPC::semaphore metaLocker(std::string("liveMeta@" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
bool metaLock = false;
configLock.wait();
DTSC::Scan strm = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(streamname).getMember("meta");
IPC::sharedPage streamIndex;
if (!strm){
configLock.post();
//Stream metadata not found - attempt to start it
if (Util::startInput(streamname)){
streamIndex.init(streamname, 8 * 1024 * 1024);
if (streamIndex.mapped){
metaLock = true;
metaLocker.wait();
strm = DTSC::Packet(streamIndex.mapped, streamIndex.len, true).getScan();
}
}
if (!strm){
//stream failed to start or isn't configured
response += "// Stream isn't configured and/or couldn't be started. Sorry.\n";
}
configLock.wait();
}
DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
if (strm && prots){
DTSC::Scan trcks = strm.getMember("tracks");
unsigned int trcks_ctr = trcks.getSize();
for (unsigned int i = 0; i < trcks_ctr; ++i){
if (trcks.getIndice(i).getMember("width").asInt() > json_resp["width"].asInt()){
json_resp["width"] = trcks.getIndice(i).getMember("width").asInt();
}
if (trcks.getIndice(i).getMember("height").asInt() > json_resp["height"].asInt()){
json_resp["height"] = trcks.getIndice(i).getMember("height").asInt();
}
}
if (json_resp["width"].asInt() < 1 || json_resp["height"].asInt() < 1){
json_resp["width"] = 640ll;
json_resp["height"] = 480ll;
}
if (strm.getMember("vod")){
json_resp["type"] = "vod";
}
if (strm.getMember("live")){
json_resp["type"] = "live";
}
// show ALL the meta datas!
json_resp["meta"] = strm.asJSON();
for (JSON::ObjIter it = json_resp["meta"]["tracks"].ObjBegin(); it != json_resp["meta"]["tracks"].ObjEnd(); ++it){
it->second.removeMember("fragments");
it->second.removeMember("keys");
it->second.removeMember("parts");
}
//create a set for storing source information
std::set<JSON::Value, sourceCompare> sources;
//find out which connectors are enabled
std::set<std::string> conns;
unsigned int prots_ctr = prots.getSize();
for (unsigned int i = 0; i < prots_ctr; ++i){
conns.insert(prots.getIndice(i).getMember("connector").asString());
}
//loop over the connectors.
for (unsigned int i = 0; i < prots_ctr; ++i){
std::string cName = prots.getIndice(i).getMember("connector").asString();
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName);
//if the connector has a port,
if (capa.getMember("optional").getMember("port")){
//get the default port if none is set
std::string port = prots.getIndice(i).getMember("port").asString();
if (!port.size()){
port = capa.getMember("optional").getMember("port").getMember("default").asString();
}
//and a URL - then list the URL
if (capa.getMember("url_rel")){
JSON::Value capa_json = capa.asJSON();
addSources(streamname, capa.getMember("url_rel").asString(), sources, host, port, capa_json, json_resp["meta"]);
}
//check each enabled protocol separately to see if it depends on this connector
DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors");
unsigned int capa_lst_ctr = capa_lst.getSize();
for (unsigned int j = 0; j < capa_lst_ctr; ++j){
//if it depends on this connector and has a URL, list it
if (conns.count(capa_lst.getIndiceName(j)) && (capa_lst.getIndice(j).getMember("deps").asString() == cName || capa_lst.getIndice(j).getMember("deps").asString() + ".exe" == cName) && capa_lst.getIndice(j).getMember("methods")){
JSON::Value capa_json = capa_lst.getIndice(j).asJSON();
addSources(streamname, capa_lst.getIndice(j).getMember("url_rel").asString(), sources, host, port, capa_json, json_resp["meta"]);
}
}
}
}
//loop over the added sources, add them to json_resp["sources"]
for (std::set<JSON::Value, sourceCompare>::iterator it = sources.begin(); it != sources.end(); it++){
if ((*it)["simul_tracks"].asInt() > 0){
json_resp["source"].append(*it);
}
}
}else{
json_resp["error"] = "The specified stream is not available on this server.";
}
if (metaLock){
metaLocker.post();
}
configLock.post();
configLock.close();
response += "mistvideo['" + streamname + "'] = " + json_resp.toString() + ";\n";
if (url.substr(0, 6) != "/info_" && !json_resp.isMember("error")){
response.append("\n(");
if (embed_js[embed_js_len - 2] == ';'){//check if we have a trailing ;\n or just \n
response.append((char*)embed_js, (size_t)embed_js_len - 2); //remove trailing ";\n" from xxd conversion
}else{
response.append((char*)embed_js, (size_t)embed_js_len - 1); //remove trailing "\n" from xxd conversion
}
response.append("(\"" + streamname + "\"));\n");
}
H.SetBody(response);
long long int ret = Util::getMS();
H.SendResponse("200", "OK", conn);
return ret;
} //embed code generator
return proxyHandleUnsupported(H, conn); //anything else doesn't get handled
}
///\brief Handles requests by starting a corresponding output process.
///\param H The request to be handled
///\param conn The connection to the client that issued the request.
///\param connector The type of connector to be invoked.
///\return -1 on failure, else 0.
long long int proxyHandleThroughConnector(HTTP::Parser & H, Socket::Connection & conn, std::string & connector){
//create a unique ID based on a hash of the user agent and host, followed by the stream name and connector
std::stringstream uidtemp;
/// \todo verify the correct formation of the uid
uidtemp << Secure::md5(H.GetHeader("User-Agent") + conn.getHost()) << "_" << H.GetVar("stream") << "_" << connector;
std::string uid = uidtemp.str();
//fdIn and fdOut are connected to conn.sock
int fdIn = conn.getSocket();
int fdOut = conn.getSocket();
//taken from CheckProtocols (controller_connectors.cpp)
char * argarr[20];
for (int i=0; i<20; i++){argarr[i] = 0;}
int id = -1;
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
unsigned int prots_ctr = prots.getSize();
for (unsigned int i=0; i < prots_ctr; ++i){
if (prots.getIndice(i).getMember("connector").asString() == connector) {
id = i;
break; //pick the first protocol in the list that matches the connector
}
}
if (id == -1) {
DEBUG_MSG(DLVL_ERROR, "No connector found for: %s", connector.c_str());
configLock.post();
configLock.close();
return -1;
}
DEBUG_MSG(DLVL_HIGH, "Connector found: %s", connector.c_str());
//build arguments for starting output process
std::string temphost=conn.getHost();
std::string tempstream=H.GetVar("stream");
std::string debuglevel = JSON::Value((long long)Util::Config::printDebugLevel).asString();
std::string tmparg;
tmparg = Util::getMyPath() + std::string("MistOut") + connector;
struct stat buf;
if (::stat(tmparg.c_str(), &buf) != 0){
tmparg = Util::getMyPath() + std::string("MistConn") + connector;
}
int argnum = 0;
argarr[argnum++] = (char*)tmparg.c_str();
JSON::Value p = prots.getIndice(id).asJSON();
JSON::Value pipedCapa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(connector).asJSON();
configLock.post();
configLock.close();
argarr[argnum++] = (char*)"-i";
argarr[argnum++] = (char*)(temphost.c_str());
argarr[argnum++] = (char*)"-s";
argarr[argnum++] = (char*)(tempstream.c_str());
//set the debug level if non-default
if (Util::Config::printDebugLevel != DEBUG){
argarr[argnum++] = (char*)"--debug";
argarr[argnum++] = (char*)(debuglevel.c_str());
}
if (pipedCapa.isMember("required")){builPipedPart(p, argarr, argnum, pipedCapa["required"]);}
if (pipedCapa.isMember("optional")){builPipedPart(p, argarr, argnum, pipedCapa["optional"]);}
int tempint = fileno(stderr);
///start output process, fdIn and fdOut are connected to conn.sock
Util::Procs::StartPiped(argarr, & fdIn, & fdOut, & tempint);
conn.drop();
return 0;
}
///\brief Determines the type of connector to be used for handling a request.
///\param H The request to be handled..
///\return A string indicating the type of connector.
///Possible values are:
/// - "none" The request is not supported.
/// - "internal" The request should be handled by the proxy itself.
/// - anything else: The request should be dispatched to a connector on the named socket.
std::string proxyGetHandleType(HTTP::Parser & H){
std::string url = H.getUrl();
if (url.length() > 4){
std::string ext = url.substr(url.length() - 4, 4);
if (ext == ".ico"){
return "internal";
}
if (url.length() > 6 && url.substr(url.length() - 5, 5) == ".html"){
return "internal";
}
if (url.length() > 6 && url.substr(url.length() - 5, 5) == ".smil"){
return "internal";
}
}
if (url == "/crossdomain.xml"){
return "internal";
}
if (url == "/clientaccesspolicy.xml"){
return "internal";
}
if (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js"){
return "internal";
}
if (url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js"){
return "internal";
}
//loop over the connectors
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors");
unsigned int capa_ctr = capa.getSize();
for (unsigned int i = 0; i < capa_ctr; ++i){
DTSC::Scan c = capa.getIndice(i);
//if it depends on HTTP and has a match or prefix...
if (c.getMember("deps").asString() == "HTTP" && (c.getMember("url_match") || c.getMember("url_prefix"))){
//if there is a matcher, try to match
if (c.getMember("url_match")){
std::string m = c.getMember("url_match").asString();
size_t found = m.find('$');
if (found != std::string::npos){
if (m.substr(0, found) == url.substr(0, found) && m.substr(found+1) == url.substr(url.size() - (m.size() - found) + 1)){
//it matched - handle it now
std::string streamname = url.substr(found, url.size() - m.size() + 1);
Util::sanitizeName(streamname);
H.SetVar("stream", streamname);
configLock.post();
configLock.close();
return capa.getIndiceName(i);
}
}
}
//if there is a prefix, try to match
if (c.getMember("url_prefix")){
std::string m = c.getMember("url_prefix").asString();
size_t found = m.find('$');
if (found != std::string::npos){
size_t found_suf = url.find(m.substr(found+1), found);
if (m.substr(0, found) == url.substr(0, found) && found_suf != std::string::npos){
//it matched - handle it now
std::string streamname = url.substr(found, found_suf - found);
Util::sanitizeName(streamname);
H.SetVar("stream", streamname);
configLock.post();
configLock.close();
return capa.getIndiceName(i);
}
}
}
}
}
configLock.post();
configLock.close();
return "none";
}
///\brief Function run as a thread to handle a single HTTP connection.
///\param conn A Socket::Connection indicating the connection to th client.
int proxyHandleHTTPConnection(Socket::Connection & conn){
conn.setBlocking(false); //do not block on conn.spool() when no data is available
HTTP::Parser Client;
while (conn.connected()){
//conn.peek reads data without removing it from pipe
if (conn.peek() && Client.Read(conn)){
std::string handler = proxyGetHandleType(Client);
DEBUG_MSG(DLVL_HIGH, "Received request: %s (%d) => %s (%s)", Client.getUrl().c_str(), conn.getSocket(), handler.c_str(), Client.GetVar("stream").c_str());
bool closeConnection = false;
if (Client.GetHeader("Connection") == "close"){
closeConnection = true;
}
if (handler == "none" || handler == "internal"){
Client.Clean();
conn.Received().clear();
conn.spool();
Client.Read(conn);
if (handler == "internal"){
proxyHandleInternal(Client, conn);
}else{
proxyHandleUnsupported(Client, conn);
}
}else{
proxyHandleThroughConnector(Client, conn, handler);
if (conn.connected()){
FAIL_MSG("Request %d (%s) failed - no connector started", conn.getSocket(), handler.c_str());
}
break;
}
DEBUG_MSG(DLVL_HIGH, "Completed request %d (%s) ", conn.getSocket(), handler.c_str());
if (closeConnection){
break;
}
Client.Clean(); //clean for any possible next requests
}else{
Util::sleep(10); //sleep 10ms
}
}
//close and remove the connection
conn.close();
return 0;
}
} //Connector_HTTP namespace
int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION);
JSON::Value capa;
capa["optional"]["debug"]["name"] = "debug";
capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed.";
capa["optional"]["debug"]["option"] = "--debug";
capa["optional"]["debug"]["type"] = "uint";
capa["desc"] = "Enables the generic HTTP listener, required by all other HTTP protocols. Needs other HTTP protocols enabled to do much of anything.";
capa["deps"] = "";
conf.addConnectorOptions(8080, capa);
conf.parseArgs(argc, argv);
if (conf.getBool("json")){
std::cout << capa.toString() << std::endl;
return -1;
}
Connector_HTTP::serverCfg.init("!mistConfig", 4*1024*1024);
return conf.serveThreadedSocket(Connector_HTTP::proxyHandleHTTPConnection);
}

View file

@ -118,7 +118,7 @@ namespace Controller {
#define connCapa capabilities["connectors"][connName]
if (connCapa.isMember("socket")){
if (connCapa.isMember("socket") || (connCapa.isMember("deps") && connCapa["deps"].asStringRef() == "HTTP")){
( *ait)["online"] = "Enabled";
continue;
}

View file

@ -15,7 +15,13 @@ int main(int argc, char * argv[]) {
std::cout << mistOut::capa.toString() << std::endl;
return -1;
}
conf.serveForkedSocket(spawnForked);
if (mistOut::listenMode()){
conf.serveForkedSocket(spawnForked);
}else{
Socket::Connection S(fileno(stdout),fileno(stdin) );
mistOut tmp(S);
return tmp.run();
}
}
return 0;
}

View file

@ -1,32 +0,0 @@
#include OUTPUTTYPE
#include <mist/config.h>
#include <mist/socket.h>
int main(int argc, char * argv[]) {
Util::Config conf(argv[0], PACKAGE_VERSION);
mistOut::init(&conf);
mistOut::capa["forward"]["streamname"]["name"] = "Stream";
mistOut::capa["forward"]["streamname"]["help"] = "What streamname to serve.";
mistOut::capa["forward"]["streamname"]["type"] = "str";
mistOut::capa["forward"]["streamname"]["option"] = "--stream";
mistOut::capa["forward"]["ip"]["name"] = "IP";
mistOut::capa["forward"]["ip"]["help"] = "IP of forwarded connection.";
mistOut::capa["forward"]["ip"]["type"] = "str";
mistOut::capa["forward"]["ip"]["option"] = "--ip";
conf.addOption("streamname",
JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}"));
conf.addOption("ip",
JSON::fromString("{\"arg\":\"string\",\"short\":\"i\",\"long\":\"ip\",\"help\":\"Ip addr of connection.\"}"));
if (conf.parseArgs(argc, argv)) {
if (conf.getBool("json")) {
std::cout << mistOut::capa.toString() << std::endl;
return -1;
}
Socket::Connection S(fileno(stdout),fileno(stdin) );
mistOut tmp(S);
return tmp.run();
}
return 0;
}

View file

@ -486,29 +486,33 @@ namespace Mist {
return false;
}
}
void Output::requestHandler(){
static bool firstData = true;//only the first time, we call onRequest if there's data buffered already.
if ((firstData && myConn.Received().size()) || myConn.spool()){
firstData = false;
DEBUG_MSG(DLVL_DONTEVEN, "onRequest");
onRequest();
}else{
if (!isBlocking && !parseData){
Util::sleep(500);
}
}
}
int Output::run() {
bool firstData = true;//only the first time, we call OnRequest if there's data buffered already.
DEBUG_MSG(DLVL_MEDIUM, "MistOut client handler started");
while (myConn.connected() && (wantRequest || parseData)){
stats();
if (wantRequest){
if ((firstData && myConn.Received().size()) || myConn.spool()){
firstData = false;
DEBUG_MSG(DLVL_DONTEVEN, "OnRequest");
onRequest();
}else{
if (!isBlocking && !parseData){
Util::sleep(500);
}
}
requestHandler();
}
if (parseData){
if (!isInitialized){
initialize();
}
if ( !sentHeader){
DEBUG_MSG(DLVL_DONTEVEN, "SendHeader");
DEBUG_MSG(DLVL_DONTEVEN, "sendHeader");
sendHeader();
}
prepareNext();

View file

@ -58,6 +58,7 @@ namespace Mist {
void stop();
void setBlocking(bool blocking);
void updateMeta();
static bool listenMode(){return true;}
//virtuals. The optional virtuals have default implementations that do as little as possible.
virtual void sendNext() {}//REQUIRED! Others are optional.
virtual void prepareNext();
@ -66,18 +67,19 @@ namespace Mist {
virtual void initialize();
virtual void sendHeader();
virtual void onFail();
virtual void requestHandler();
private://these *should* not be messed with in child classes.
std::map<unsigned long, unsigned int> currKeyOpen;
void loadPageForKey(long unsigned int trackId, long long int keyNum);
bool isBlocking;///< If true, indicates that myConn is blocking.
unsigned int lastStats;///<Time of last sending of stats.
IPC::sharedClient statsPage;///< Shared memory used for statistics reporting.
long long unsigned int firstTime;///< Time of first packet after last seek. Used for real-time sending.
std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes.
std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets.
std::map<unsigned long, unsigned long> lastKeyTime;///< Stores the time of the last keyframe, for preventing duplicates
bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext().
protected://these are to be messed with by child classes
IPC::sharedClient statsPage;///< Shared memory used for statistics reporting.
bool isBlocking;///< If true, indicates that myConn is blocking.
unsigned int crc;///< Checksum, if any, for usage in the stats.
unsigned int getKeyForTime(long unsigned int trackId, long long timeStamp);
IPC::sharedPage streamIndex;///< Shared memory used for metadata

View file

@ -1,6 +1,4 @@
#include "output_hds.h"
#include <mist/defines.h>
#include <mist/http_parser.h>
#include <mist/stream.h>
#include <unistd.h>
#include <mist/amf.h>
@ -129,30 +127,19 @@ namespace Mist {
return Result.str();
} //BuildManifest
OutHDS::OutHDS(Socket::Connection & conn) : Output(conn) {
OutHDS::OutHDS(Socket::Connection & conn) : HTTPOutput(conn) {
audioTrack = 0;
playUntil = 0;
myConn.setHost(config->getString("ip"));
streamName = config->getString("streamname");
}
void OutHDS::onFail(){
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
HTTP_S.SetBody("Stream not found. Sorry, we tried.");
HTTP_S.SendResponse("404", "Stream not found", myConn);
Output::onFail();
}
OutHDS::~OutHDS() {}
void OutHDS::init(Util::Config * cfg){
Output::init(cfg);
HTTPOutput::init(cfg);
capa["name"] = "HDS";
capa["desc"] = "Enables HTTP protocol Adobe-specific dynamic streaming (also known as HDS).";
capa["deps"] = "HTTP";
capa["url_rel"] = "/dynamic/$/manifest.f4m";
capa["url_prefix"] = "/dynamic/$/";
capa["socket"] = "http_hds";
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("H263");
capa["codecs"][0u][0u].append("VP6");
@ -171,8 +158,6 @@ namespace Mist {
capa["methods"][0u]["handler"] = "http";
capa["methods"][0u]["type"] = "flash/11";
capa["methods"][0u]["priority"] = 7ll;
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
void OutHDS::sendNext(){
@ -180,106 +165,100 @@ namespace Mist {
DEBUG_MSG(DLVL_DEVEL, "(%d) Done sending fragment", getpid() );
stop();
wantRequest = true;
HTTP_S.Chunkify("", 0, myConn);
H.Chunkify("", 0, myConn);
return;
}
tag.DTSCLoader(currentPacket, myMeta.tracks[currentPacket.getTrackId()]);
HTTP_S.Chunkify(tag.data, tag.len, myConn);
H.Chunkify(tag.data, tag.len, myConn);
}
void OutHDS::onRequest(){
HTTP_R.Clean();
while (HTTP_R.Read(myConn)){
std::string ua = HTTP_R.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
DEBUG_MSG(DLVL_DEVEL, "Received request: %s", HTTP_R.getUrl().c_str());
if (HTTP_R.url.find(".abst") != std::string::npos){
initialize();
std::string streamID = HTTP_R.url.substr(streamName.size() + 10);
streamID = streamID.substr(0, streamID.find(".abst"));
HTTP_S.Clean();
HTTP_S.SetBody(dynamicBootstrap(atoll(streamID.c_str())));
HTTP_S.SetHeader("Content-Type", "binary/octet");
HTTP_S.SetHeader("Cache-Control", "no-cache");
HTTP_S.SendResponse("200", "OK", myConn);
HTTP_R.Clean(); //clean for any possible next requests
continue;
void OutHDS::onHTTP(){
if (H.url.find(".abst") != std::string::npos){
initialize();
std::string streamID = H.url.substr(streamName.size() + 10);
streamID = streamID.substr(0, streamID.find(".abst"));
H.Clean();
H.SetBody(dynamicBootstrap(atoll(streamID.c_str())));
H.SetHeader("Content-Type", "binary/octet");
H.SetHeader("Cache-Control", "no-cache");
H.SendResponse("200", "OK", myConn);
H.Clean(); //clean for any possible next requests
return;
}
if (H.url.find("f4m") == std::string::npos){
initialize();
std::string tmp_qual = H.url.substr(H.url.find("/", 10) + 1);
unsigned int tid;
unsigned int fragNum;
tid = atoi(tmp_qual.substr(0, tmp_qual.find("Seg") - 1).c_str());
int temp;
temp = H.url.find("Seg") + 3;
temp = H.url.find("Frag") + 4;
fragNum = atoi(H.url.substr(temp).c_str()) - 1;
DEBUG_MSG(DLVL_MEDIUM, "Video track %d, fragment %d\n", tid, fragNum);
if (!audioTrack){getTracks();}
unsigned int mstime = 0;
unsigned int mslen = 0;
if (fragNum < (unsigned int)myMeta.tracks[tid].missedFrags){
H.Clean();
H.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
H.SendResponse("412", "Fragment out of range", myConn);
H.Clean(); //clean for any possible next requests
std::cout << "Fragment " << fragNum << " too old" << std::endl;
return;
}
if (HTTP_R.url.find("f4m") == std::string::npos){
initialize();
std::string tmp_qual = HTTP_R.url.substr(HTTP_R.url.find("/", 10) + 1);
unsigned int tid;
unsigned int fragNum;
tid = atoi(tmp_qual.substr(0, tmp_qual.find("Seg") - 1).c_str());
int temp;
temp = HTTP_R.url.find("Seg") + 3;
temp = HTTP_R.url.find("Frag") + 4;
fragNum = atoi(HTTP_R.url.substr(temp).c_str()) - 1;
DEBUG_MSG(DLVL_MEDIUM, "Video track %d, fragment %d\n", tid, fragNum);
if (!audioTrack){getTracks();}
unsigned int mstime = 0;
unsigned int mslen = 0;
if (fragNum < (unsigned int)myMeta.tracks[tid].missedFrags){
HTTP_S.Clean();
HTTP_S.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
HTTP_S.SendResponse("412", "Fragment out of range", myConn);
HTTP_R.Clean(); //clean for any possible next requests
std::cout << "Fragment " << fragNum << " too old" << std::endl;
continue;
}
if (fragNum > myMeta.tracks[tid].missedFrags + myMeta.tracks[tid].fragments.size() - 1){
HTTP_S.Clean();
HTTP_S.SetBody("Proxy, re-request this in a second or two.\n");
HTTP_S.SendResponse("208", "Ask again later", myConn);
HTTP_R.Clean(); //clean for any possible next requests
std::cout << "Fragment after fragment " << fragNum << " not available yet" << std::endl;
continue;
}
mstime = myMeta.tracks[tid].getKey(myMeta.tracks[tid].fragments[fragNum - myMeta.tracks[tid].missedFrags].getNumber()).getTime();
mslen = myMeta.tracks[tid].fragments[fragNum - myMeta.tracks[tid].missedFrags].getDuration();
selectedTracks.clear();
selectedTracks.insert(tid);
if (audioTrack){
selectedTracks.insert(audioTrack);
}
seek(mstime);
playUntil = mstime + mslen;
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp4");
HTTP_S.StartResponse(HTTP_R, myConn);
//send the bootstrap
std::string bootstrap = dynamicBootstrap(tid);
HTTP_S.Chunkify(bootstrap, myConn);
//send a zero-size mdat, meaning it stretches until end of file.
HTTP_S.Chunkify("\000\000\000\000mdat", 8, myConn);
//send init data, if needed.
if (audioTrack > 0 && myMeta.tracks[audioTrack].init != ""){
if (tag.DTSCAudioInit(myMeta.tracks[audioTrack])){
tag.tagTime(mstime);
HTTP_S.Chunkify(tag.data, tag.len, myConn);
}
}
if (tid > 0){
if (tag.DTSCVideoInit(myMeta.tracks[tid])){
tag.tagTime(mstime);
HTTP_S.Chunkify(tag.data, tag.len, myConn);
}
}
parseData = true;
wantRequest = false;
}else{
initialize();
std::stringstream tmpstr;
myMeta.toPrettyString(tmpstr);
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache");
HTTP_S.SetBody(dynamicIndex());
HTTP_S.SendResponse("200", "OK", myConn);
if (fragNum > myMeta.tracks[tid].missedFrags + myMeta.tracks[tid].fragments.size() - 1){
H.Clean();
H.SetBody("Proxy, re-request this in a second or two.\n");
H.SendResponse("208", "Ask again later", myConn);
H.Clean(); //clean for any possible next requests
std::cout << "Fragment after fragment " << fragNum << " not available yet" << std::endl;
return;
}
HTTP_R.Clean(); //clean for any possible next requests
mstime = myMeta.tracks[tid].getKey(myMeta.tracks[tid].fragments[fragNum - myMeta.tracks[tid].missedFrags].getNumber()).getTime();
mslen = myMeta.tracks[tid].fragments[fragNum - myMeta.tracks[tid].missedFrags].getDuration();
selectedTracks.clear();
selectedTracks.insert(tid);
if (audioTrack){
selectedTracks.insert(audioTrack);
}
seek(mstime);
playUntil = mstime + mslen;
H.Clean();
H.SetHeader("Content-Type", "video/mp4");
H.StartResponse(H, myConn);
//send the bootstrap
std::string bootstrap = dynamicBootstrap(tid);
H.Chunkify(bootstrap, myConn);
//send a zero-size mdat, meaning it stretches until end of file.
H.Chunkify("\000\000\000\000mdat", 8, myConn);
//send init data, if needed.
if (audioTrack > 0 && myMeta.tracks[audioTrack].init != ""){
if (tag.DTSCAudioInit(myMeta.tracks[audioTrack])){
tag.tagTime(mstime);
H.Chunkify(tag.data, tag.len, myConn);
}
}
if (tid > 0){
if (tag.DTSCVideoInit(myMeta.tracks[tid])){
tag.tagTime(mstime);
H.Chunkify(tag.data, tag.len, myConn);
}
}
parseData = true;
wantRequest = false;
}else{
initialize();
std::stringstream tmpstr;
myMeta.toPrettyString(tmpstr);
H.Clean();
H.SetHeader("Content-Type", "text/xml");
H.SetHeader("Cache-Control", "no-cache");
H.SetBody(dynamicIndex());
H.SendResponse("200", "OK", myConn);
}
}
}

View file

@ -1,25 +1,20 @@
#include "output.h"
#include <mist/http_parser.h>
#include "output_http.h"
#include <mist/ts_packet.h>
#include <mist/mp4.h>
#include <mist/mp4_generic.h>
namespace Mist {
class OutHDS : public Output {
class OutHDS : public HTTPOutput {
public:
OutHDS(Socket::Connection & conn);
~OutHDS();
static void init(Util::Config * cfg);
void onRequest();
void onFail();
void onHTTP();
void sendNext();
protected:
void getTracks();
std::string dynamicBootstrap(int tid);
std::string dynamicIndex();
HTTP::Parser HTTP_S;
HTTP::Parser HTTP_R;
std::set<int> videoTracks;///<< Holds valid video tracks for playback
long long int audioTrack;///<< Holds audio track ID for playback
long long unsigned int playUntil;

View file

@ -1,6 +1,4 @@
#include "output_hls.h"
#include <mist/defines.h>
#include <mist/http_parser.h>
#include <mist/stream.h>
#include <unistd.h>
@ -92,39 +90,26 @@ namespace Mist {
} //liveIndex
OutHLS::OutHLS(Socket::Connection & conn) : Output(conn) {
OutHLS::OutHLS(Socket::Connection & conn) : HTTPOutput(conn) {
haveAvcc = false;
realTime = 0;
myConn.setHost(config->getString("ip"));
myConn.setBlocking(true);
streamName = config->getString("streamname");
}
OutHLS::~OutHLS() {}
void OutHLS::onFail(){
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
HTTP_S.SetBody("Stream not found. Sorry, we tried.");
HTTP_S.SendResponse("404", "Stream not found", myConn);
Output::onFail();
}
void OutHLS::init(Util::Config * cfg){
Output::init(cfg);
HTTPOutput::init(cfg);
capa["name"] = "HLS";
capa["desc"] = "Enables HTTP protocol Apple-specific streaming (also known as HLS).";
capa["deps"] = "HTTP";
capa["url_rel"] = "/hls/$/index.m3u8";
capa["url_prefix"] = "/hls/$/";
capa["socket"] = "http_hls";
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][1u].append("AAC");
capa["codecs"][0u][1u].append("MP3");
capa["methods"][0u]["handler"] = "http";
capa["methods"][0u]["type"] = "html5/application/vnd.apple.mpegurl";
capa["methods"][0u]["priority"] = 9ll;
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
///this function generates the PMT packet
@ -157,17 +142,15 @@ namespace Mist {
PMT.calcCRC();
return PMT.getStrBuf();
}
void OutHLS::fillPacket(bool & first, const char * data, size_t dataLen, char & ContCounter){
if (!PackData.BytesFree()){
if (PacketNumber % 42 == 0){
HTTP_S.Chunkify(TS::PAT, 188, myConn);
std::string PMT = createPMT();
HTTP_S.Chunkify(PMT, myConn);
H.Chunkify(TS::PAT, 188, myConn);
H.Chunkify(createPMT().c_str(), 188, myConn);
PacketNumber += 2;
}
HTTP_S.Chunkify(PackData.ToString(), 188, myConn);
H.Chunkify(PackData.ToString(), 188, myConn);
PacketNumber ++;
PackData.Clear();
}
@ -197,7 +180,6 @@ namespace Mist {
void OutHLS::sendNext(){
bool first = true;
char * ContCounter = 0;
char * dataPointer = 0;
unsigned int dataLen = 0;
currentPacket.getString("data", dataPointer, dataLen); //data
@ -206,8 +188,8 @@ namespace Mist {
stop();
wantRequest = true;
parseData = false;
HTTP_S.Chunkify("", 0, myConn);
HTTP_S.Clean();
H.Chunkify("", 0, myConn);
H.Clean();
return;
}
@ -256,7 +238,6 @@ namespace Mist {
fillPacket(first, bs.data(), bs.size(), AudioCounter);
bs = TS::GetAudioHeader(dataLen, myMeta.tracks[currentPacket.getTrackId()].init);
fillPacket(first, bs.data(), bs.size(), AudioCounter);
ContCounter = &AudioCounter;
fillPacket(first, dataPointer,dataLen, AudioCounter);
if (PackData.BytesFree() < 184){
PackData.AddStuffing();
@ -284,100 +265,78 @@ namespace Mist {
return 0;
}
void OutHLS::onRequest(){
while (HTTP_R.Read(myConn)){
std::string ua = HTTP_R.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
DEBUG_MSG(DLVL_MEDIUM, "Received request: %s", HTTP_R.getUrl().c_str());
if (HTTP_R.url == "/crossdomain.xml"){
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
HTTP_S.SetBody("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><allow-access-from domain=\"*\" /><site-control permitted-cross-domain-policies=\"all\"/></cross-domain-policy>");
HTTP_S.SendResponse("200", "OK", myConn);
HTTP_R.Clean(); //clean for any possible next requests
continue;
} //crossdomain.xml
if (HTTP_R.url.find("hls") == std::string::npos){
myConn.close();
continue;
}
AppleCompat = (HTTP_R.GetHeader("User-Agent").find("Apple") != std::string::npos);
initialize();
if (HTTP_R.url.find(".m3u") == std::string::npos){
std::string tmpStr = HTTP_R.getUrl().substr(5+streamName.size());
long long unsigned int from;
if (sscanf(tmpStr.c_str(), "/%u_%u/%llu_%llu.ts", &vidTrack, &audTrack, &from, &until) != 4){
if (sscanf(tmpStr.c_str(), "/%u/%llu_%llu.ts", &vidTrack, &from, &until) != 3){
DEBUG_MSG(DLVL_MEDIUM, "Could not parse URL: %s", HTTP_R.getUrl().c_str());
HTTP_S.Clean();
HTTP_S.SetBody("The HLS URL wasn't understood - what did you want, exactly?\n");
myConn.SendNow(HTTP_S.BuildResponse("404", "URL mismatch"));
HTTP_R.Clean(); //clean for any possible next requests
continue;
}else{
selectedTracks.clear();
selectedTracks.insert(vidTrack);
}
void OutHLS::onHTTP(){
AppleCompat = (H.GetHeader("User-Agent").find("Apple") != std::string::npos);
initialize();
if (H.url.find(".m3u") == std::string::npos){
std::string tmpStr = H.getUrl().substr(5+streamName.size());
long long unsigned int from;
if (sscanf(tmpStr.c_str(), "/%u_%u/%llu_%llu.ts", &vidTrack, &audTrack, &from, &until) != 4){
if (sscanf(tmpStr.c_str(), "/%u/%llu_%llu.ts", &vidTrack, &from, &until) != 3){
DEBUG_MSG(DLVL_MEDIUM, "Could not parse URL: %s", H.getUrl().c_str());
H.Clean();
H.SetBody("The HLS URL wasn't understood - what did you want, exactly?\n");
myConn.SendNow(H.BuildResponse("404", "URL mismatch"));
H.Clean(); //clean for any possible next requests
return;
}else{
selectedTracks.clear();
selectedTracks.insert(vidTrack);
selectedTracks.insert(audTrack);
}
if (myMeta.live){
/// \todo Detection of out-of-range parts.
int seekable = canSeekms(from);
if (seekable < 0){
HTTP_S.Clean();
HTTP_S.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
myConn.SendNow(HTTP_S.BuildResponse("412", "Fragment out of range"));
HTTP_R.Clean(); //clean for any possible next requests
DEBUG_MSG(DLVL_WARN, "Fragment @ %llu too old", from);
continue;
}
if (seekable > 0){
HTTP_S.Clean();
HTTP_S.SetBody("Proxy, re-request this in a second or two.\n");
myConn.SendNow(HTTP_S.BuildResponse("208", "Ask again later"));
HTTP_R.Clean(); //clean for any possible next requests
DEBUG_MSG(DLVL_WARN, "Fragment @ %llu not available yet", from);
continue;
}
}
seek(from);
lastVid = from * 90;
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp2t");
HTTP_S.StartResponse(HTTP_R, myConn);
PacketNumber = 0;
parseData = true;
wantRequest = false;
}else{
initialize();
std::string request = HTTP_R.url.substr(HTTP_R.url.find("/", 5) + 1);
HTTP_S.Clean();
if (HTTP_R.url.find(".m3u8") != std::string::npos){
HTTP_S.SetHeader("Content-Type", "audio/x-mpegurl");
}else{
HTTP_S.SetHeader("Content-Type", "audio/mpegurl");
}
HTTP_S.SetHeader("Cache-Control", "no-cache");
std::string manifest;
if (request.find("/") == std::string::npos){
manifest = liveIndex();
}else{
int selectId = atoi(request.substr(0,request.find("/")).c_str());
manifest = liveIndex(selectId);
}
HTTP_S.SetBody(manifest);
HTTP_S.SendResponse("200", "OK", myConn);
selectedTracks.clear();
selectedTracks.insert(vidTrack);
selectedTracks.insert(audTrack);
}
HTTP_R.Clean(); //clean for any possible next requests
if (myMeta.live){
/// \todo Detection of out-of-range parts.
int seekable = canSeekms(from);
if (seekable < 0){
H.Clean();
H.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
myConn.SendNow(H.BuildResponse("412", "Fragment out of range"));
H.Clean(); //clean for any possible next requests
DEBUG_MSG(DLVL_WARN, "Fragment @ %llu too old", from);
return;
}
if (seekable > 0){
H.Clean();
H.SetBody("Proxy, re-request this in a second or two.\n");
myConn.SendNow(H.BuildResponse("208", "Ask again later"));
H.Clean(); //clean for any possible next requests
DEBUG_MSG(DLVL_WARN, "Fragment @ %llu not available yet", from);
return;
}
}
seek(from);
lastVid = from * 90;
H.StartResponse(H, myConn);
H.SetHeader("Content-Type", "video/mp2t");
PacketNumber = 0;
parseData = true;
wantRequest = false;
}else{
initialize();
std::string request = H.url.substr(H.url.find("/", 5) + 1);
H.Clean();
if (H.url.find(".m3u8") != std::string::npos){
H.SetHeader("Content-Type", "audio/x-mpegurl");
}else{
H.SetHeader("Content-Type", "audio/mpegurl");
}
H.SetHeader("Cache-Control", "no-cache");
std::string manifest;
if (request.find("/") == std::string::npos){
manifest = liveIndex();
}else{
int selectId = atoi(request.substr(0,request.find("/")).c_str());
manifest = liveIndex(selectId);
}
H.SetBody(manifest);
H.SendResponse("200", "OK", myConn);
}
}
}

View file

@ -1,21 +1,17 @@
#include "output.h"
#include <mist/http_parser.h>
#include "output_http.h"
#include <mist/ts_packet.h>
#include <mist/mp4.h>
#include <mist/mp4_generic.h>
namespace Mist {
class OutHLS : public Output {
class OutHLS : public HTTPOutput {
public:
OutHLS(Socket::Connection & conn);
~OutHLS();
static void init(Util::Config * cfg);
void onRequest();
void onFail();
void onHTTP();
void sendNext();
protected:
HTTP::Parser HTTP_S;
HTTP::Parser HTTP_R;
std::string createPMT();
void fillPacket(bool & first, const char * data, size_t dataLen, char & ContCounter);
std::string liveIndex();

View file

@ -9,8 +9,6 @@
#include <mist/checksum.h>
#include <unistd.h>
///\todo Maybe move to util?
long long unsigned int binToInt(std::string & binary) {
long long int result = 0;
@ -45,22 +43,15 @@ std::string toUTF16(std::string original) {
namespace Mist {
OutHSS::OutHSS(Socket::Connection & conn) : Output(conn) {
realTime = 0;
myConn.setHost(config->getString("ip"));
streamName = config->getString("streamname");
}
OutHSS::~OutHSS() {}
OutHSS::OutHSS(Socket::Connection & conn) : HTTPOutput(conn){realTime = 0;}
OutHSS::~OutHSS(){}
void OutHSS::init(Util::Config * cfg) {
Output::init(cfg);
HTTPOutput::init(cfg);
capa["name"] = "HSS";
capa["desc"] = "Enables HTTP protocol Microsoft-specific smooth streaming through silverlight (also known as HSS).";
capa["deps"] = "HTTP";
capa["url_rel"] = "/smooth/$.ism/Manifest";
capa["url_prefix"] = "/smooth/$.ism/";
capa["socket"] = "http_hss";
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][1u].append("AAC");
capa["methods"][0u]["handler"] = "http";
@ -71,31 +62,22 @@ namespace Mist {
capa["methods"][1u]["type"] = "silverlight";
capa["methods"][1u]["priority"] = 1ll;
capa["methods"][1u]["nolive"] = 1;
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
void OutHSS::sendNext() {
if (currentPacket.getTime() >= playUntil) {
stop();
wantRequest = true;
HTTP_S.Chunkify("", 0, myConn);
HTTP_R.Clean();
H.Chunkify("", 0, myConn);
H.Clean();
return;
}
char * dataPointer = 0;
unsigned int len = 0;
currentPacket.getString("data", dataPointer, len);
HTTP_S.Chunkify(dataPointer, len, myConn);
H.Chunkify(dataPointer, len, myConn);
}
void OutHSS::onFail(){
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
HTTP_S.SetBody("Stream not found. Sorry, we tried.");
HTTP_S.SendResponse("404", "Stream not found", myConn);
Output::onFail();
}
int OutHSS::canSeekms(unsigned int ms) {
//no tracks? Frame too new by definition.
if (!myMeta.tracks.size()) {
@ -118,12 +100,11 @@ namespace Mist {
return 0;
}
void OutHSS::sendHeader() {
//We have a non-manifest request, parse it.
std::string Quality = HTTP_R.url.substr(HTTP_R.url.find("TrackID=", 8) + 8);
std::string Quality = H.url.substr(H.url.find("TrackID=", 8) + 8);
Quality = Quality.substr(0, Quality.find(")"));
std::string parseString = HTTP_R.url.substr(HTTP_R.url.find(")/") + 2);
std::string parseString = H.url.substr(H.url.find(")/") + 2);
parseString = parseString.substr(parseString.find("(") + 1);
long long int seekTime = atoll(parseString.substr(0, parseString.find(")")).c_str()) / 10000;
unsigned int tid = atoll(Quality.c_str());
@ -144,20 +125,20 @@ namespace Mist {
}
}
if (seekable < 0){
HTTP_S.Clean();
HTTP_S.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
myConn.SendNow(HTTP_S.BuildResponse("412", "Fragment out of range"));
HTTP_R.Clean(); //clean for any possible next requests
H.Clean();
H.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
myConn.SendNow(H.BuildResponse("412", "Fragment out of range"));
H.Clean(); //clean for any possible next requests
std::cout << "Fragment @ " << seekTime << "ms too old (" << myMeta.tracks[tid].firstms << " - " << myMeta.tracks[tid].lastms << " ms)" << std::endl;
stop();
wantRequest = true;
return;
}
if (seekable > 0){
HTTP_S.Clean();
HTTP_S.SetBody("Proxy, re-request this in a second or two.\n");
myConn.SendNow(HTTP_S.BuildResponse("208", "Ask again later"));
HTTP_R.Clean(); //clean for any possible next requests
H.Clean();
H.SetBody("Proxy, re-request this in a second or two.\n");
myConn.SendNow(H.BuildResponse("208", "Ask again later"));
H.Clean(); //clean for any possible next requests
std::cout << "Fragment @ " << seekTime << "ms not available yet (" << myMeta.tracks[tid].firstms << " - " << myMeta.tracks[tid].lastms << " ms)" << std::endl;
stop();
wantRequest = true;
@ -181,10 +162,10 @@ namespace Mist {
nextIt++;
if (nextIt == myMeta.tracks[tid].keys.end()) {
if (myMeta.live) {
HTTP_S.Clean();
HTTP_S.SetBody("Proxy, re-request this in a second or two.\n");
myConn.SendNow(HTTP_S.BuildResponse("208", "Ask again later"));
HTTP_R.Clean(); //clean for any possible next requests
H.Clean();
H.SetBody("Proxy, re-request this in a second or two.\n");
myConn.SendNow(H.BuildResponse("208", "Ask again later"));
H.Clean(); //clean for any possible next requests
std::cout << "Fragment after fragment @ " << seekTime << " not available yet" << std::endl;
}
}
@ -192,16 +173,16 @@ namespace Mist {
}
partOffset += it->getParts();
}
if (HTTP_R.url == "/") {
if (H.url == "/") {
return; //Don't continue, but continue instead.
}
/*
if (myMeta.live) {
if (mstime == 0 && seekTime > 1){
HTTP_S.Clean();
HTTP_S.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
myConn.SendNow(HTTP_S.BuildResponse("412", "Fragment out of range"));
HTTP_R.Clean(); //clean for any possible next requests
H.Clean();
H.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
myConn.SendNow(H.BuildResponse("412", "Fragment out of range"));
H.Clean(); //clean for any possible next requests
std::cout << "Fragment @ " << seekTime << " too old" << std::endl;
continue;
}
@ -289,15 +270,15 @@ namespace Mist {
traf_box.setContent(trun_box, 1);
moof_box.setContent(traf_box, 1);
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp4");
HTTP_S.StartResponse(HTTP_R, myConn);
HTTP_S.Chunkify(moof_box.asBox(), moof_box.boxedSize(), myConn);
H.Clean();
H.SetHeader("Content-Type", "video/mp4");
H.StartResponse(H, myConn);
H.Chunkify(moof_box.asBox(), moof_box.boxedSize(), myConn);
int size = htonl(keySize + 8);
HTTP_S.Chunkify((char *)&size, 4, myConn);
HTTP_S.Chunkify("mdat", 4, myConn);
H.Chunkify((char *)&size, 4, myConn);
H.Chunkify("mdat", 4, myConn);
sentHeader = true;
HTTP_R.Clean();
H.Clean();
}
@ -446,25 +427,21 @@ namespace Mist {
} //smoothIndex
void OutHSS::onRequest() {
sentHeader = false;
while (HTTP_R.Read(myConn)) {
initialize();
std::string ua = HTTP_R.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
if (HTTP_R.url.find("Manifest") != std::string::npos) {
//Manifest, direct reply
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache");
std::string manifest = smoothIndex();
HTTP_S.SetBody(manifest);
HTTP_S.SendResponse("200", "OK", myConn);
HTTP_R.Clean();
} else {
parseData = true;
wantRequest = false;
}
void OutHSS::onHTTP() {
initialize();
if (H.url.find("Manifest") != std::string::npos) {
//Manifest, direct reply
H.Clean();
H.SetHeader("Content-Type", "text/xml");
H.SetHeader("Cache-Control", "no-cache");
std::string manifest = smoothIndex();
H.SetBody(manifest);
H.SendResponse("200", "OK", myConn);
H.Clean();
} else {
parseData = true;
wantRequest = false;
sendHeader();
}
}
@ -477,6 +454,4 @@ namespace Mist {
}
}
}

View file

@ -1,21 +1,17 @@
#include "output.h"
#include "output_http.h"
#include <mist/http_parser.h>
namespace Mist {
class OutHSS : public Output {
class OutHSS : public HTTPOutput {
public:
OutHSS(Socket::Connection & conn);
~OutHSS();
static void init(Util::Config * cfg);
void onRequest();
void onHTTP();
void sendNext();
void initialize();
void onFail();
void sendHeader();
protected:
HTTP::Parser HTTP_S;
HTTP::Parser HTTP_R;
JSON::Value encryption;
std::string smoothIndex();
int canSeekms(unsigned int ms);

295
src/output/output_http.cpp Normal file
View file

@ -0,0 +1,295 @@
#include <sys/stat.h>
#include "output_http.h"
#include <mist/stream.h>
#include <mist/checksum.h>
namespace Mist {
HTTPOutput::HTTPOutput(Socket::Connection & conn) : Output(conn) {
if (config->getString("ip").size()){
myConn.setHost(config->getString("ip"));
}
if (config->getString("streamname").size()){
streamName = config->getString("streamname");
}
}
void HTTPOutput::init(Util::Config * cfg){
Output::init(cfg);
capa["deps"] = "HTTP";
capa["forward"]["streamname"]["name"] = "Stream";
capa["forward"]["streamname"]["help"] = "What streamname to serve.";
capa["forward"]["streamname"]["type"] = "str";
capa["forward"]["streamname"]["option"] = "--stream";
capa["forward"]["ip"]["name"] = "IP";
capa["forward"]["ip"]["help"] = "IP of forwarded connection.";
capa["forward"]["ip"]["type"] = "str";
capa["forward"]["ip"]["option"] = "--ip";
cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}"));
cfg->addOption("ip", JSON::fromString("{\"arg\":\"string\",\"short\":\"i\",\"long\":\"ip\",\"help\":\"Ip addr of connection.\"}"));
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
void HTTPOutput::onFail(){
H.Clean(); //make sure no parts of old requests are left in any buffers
H.SetBody("Stream not found. Sorry, we tried.");
H.SendResponse("404", "Stream not found", myConn);
Output::onFail();
}
bool isMatch(const std::string & url, const std::string & m, std::string & streamname){
size_t found = m.find('$');
if (found != std::string::npos){
if (m.substr(0, found) == url.substr(0, found) && m.substr(found+1) == url.substr(url.size() - (m.size() - found) + 1)){
streamname = url.substr(found, url.size() - m.size() + 1);
return true;
}
}
return (url == m);
}
bool isPrefix(const std::string & url, const std::string & m, std::string & streamname){
size_t found = m.find('$');
if (found != std::string::npos){
size_t found_suf = url.find(m.substr(found+1), found);
if (m.substr(0, found) == url.substr(0, found) && found_suf != std::string::npos){
streamname = url.substr(found, found_suf - found);
return true;
}
}
return false;
}
/// - anything else: The request should be dispatched to a connector on the named socket.
std::string HTTPOutput::getHandler(){
std::string url = H.getUrl();
//check the current output first, the most common case
if (capa.isMember("url_match") || capa.isMember("url_prefix")){
bool match = false;
std::string streamname;
//if there is a matcher, try to match
if (capa.isMember("url_match")){
if (capa["url_match"].isArray()){
for (JSON::ArrIter it = capa["url_match"].ArrBegin(); it != capa["url_match"].ArrEnd(); ++it){
match |= isMatch(url, it->asStringRef(), streamname);
}
}
if (capa["url_match"].isString()){
match |= isMatch(url, capa["url_match"].asStringRef(), streamname);
}
}
//if there is a prefix, try to match
if (capa.isMember("url_prefix")){
if (capa["url_prefix"].isArray()){
for (JSON::ArrIter it = capa["url_prefix"].ArrBegin(); it != capa["url_prefix"].ArrEnd(); ++it){
match |= isPrefix(url, it->asStringRef(), streamname);
}
}
if (capa["url_prefix"].isString()){
match |= isPrefix(url, capa["url_prefix"].asStringRef(), streamname);
}
}
if (match){
if (streamname.size()){
Util::sanitizeName(streamname);
H.SetVar("stream", streamname);
}
return capa["name"].asStringRef();
}
}
//loop over the connectors
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
IPC::sharedPage serverCfg("!mistConfig", 4*1024*1024);
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors");
unsigned int capa_ctr = capa.getSize();
for (unsigned int i = 0; i < capa_ctr; ++i){
DTSC::Scan c = capa.getIndice(i);
//if it depends on HTTP and has a match or prefix...
if ((c.getMember("name").asString() == "HTTP" || c.getMember("deps").asString() == "HTTP") && (c.getMember("url_match") || c.getMember("url_prefix"))){
bool match = false;
std::string streamname;
//if there is a matcher, try to match
if (c.getMember("url_match")){
if (c.getMember("url_match").getSize()){
for (unsigned int j = 0; j < c.getMember("url_match").getSize(); ++j){
match |= isMatch(url, c.getMember("url_match").getIndice(j).asString(), streamname);
}
}else{
match |= isMatch(url, c.getMember("url_match").asString(), streamname);
}
}
//if there is a prefix, try to match
if (c.getMember("url_prefix")){
if (c.getMember("url_prefix").getSize()){
for (unsigned int j = 0; j < c.getMember("url_prefix").getSize(); ++j){
match |= isPrefix(url, c.getMember("url_prefix").getIndice(j).asString(), streamname);
}
}else{
match |= isPrefix(url, c.getMember("url_prefix").asString(), streamname);
}
}
if (match){
if (streamname.size()){
Util::sanitizeName(streamname);
H.SetVar("stream", streamname);
}
configLock.post();
configLock.close();
return capa.getIndiceName(i);
}
}
}
configLock.post();
configLock.close();
return "";
}
void HTTPOutput::requestHandler(){
if (myConn.Received().size() && myConn.spool()){
DEBUG_MSG(DLVL_DONTEVEN, "onRequest");
onRequest();
}else{
if (!myConn.Received().size()){
if (myConn.peek() && H.Read(myConn)){
std::string handler = getHandler();
DEBUG_MSG(DLVL_MEDIUM, "Received request: %s => %s (%s)", H.getUrl().c_str(), handler.c_str(), H.GetVar("stream").c_str());
if (!handler.size()){
H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
H.SetBody("<!DOCTYPE html><html><head><title>Unsupported Media Type</title></head><body><h1>Unsupported Media Type</h1>The server isn't quite sure what you wanted to receive from it.</body></html>");
H.SendResponse("415", "Unsupported Media Type", myConn);
myConn.close();
return;
}
if (handler != capa["name"].asStringRef() || H.GetVar("stream") != streamName){
DEBUG_MSG(DLVL_MEDIUM, "Switching from %s (%s) to %s (%s)", capa["name"].asStringRef().c_str(), streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str());
streamName = H.GetVar("stream");
playerConn.finish();
statsPage.finish();
reConnector(handler);
H.Clean();
if (myConn.connected()){
FAIL_MSG("Request failed - no connector started");
myConn.close();
}
return;
}else{
H.Clean();
myConn.Received().clear();
myConn.spool();
DEBUG_MSG(DLVL_DONTEVEN, "onRequest");
onRequest();
}
}else{
H.Clean();
if (myConn.Received().size()){
myConn.Received().clear();
myConn.spool();
DEBUG_MSG(DLVL_DONTEVEN, "onRequest");
onRequest();
}
}
}else{
if (!isBlocking && !parseData){
Util::sleep(500);
}
}
}
}
void HTTPOutput::onRequest(){
while (H.Read(myConn)){
std::string ua = H.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
INFO_MSG("Received request %s", H.getUrl().c_str());
if (H.GetVar("audio") != ""){
selectedTracks.insert(JSON::Value(H.GetVar("audio")).asInt());
}
if (H.GetVar("video") != ""){
selectedTracks.insert(JSON::Value(H.GetVar("video")).asInt());
}
onHTTP();
H.Clean();
}
}
static inline void builPipedPart(JSON::Value & p, char * argarr[], int & argnum, JSON::Value & argset){
for (JSON::ObjIter it = argset.ObjBegin(); it != argset.ObjEnd(); ++it){
if (it->second.isMember("option") && p.isMember(it->first)){
if (it->second.isMember("type")){
if (it->second["type"].asStringRef() == "str" && !p[it->first].isString()){
p[it->first] = p[it->first].asString();
}
if ((it->second["type"].asStringRef() == "uint" || it->second["type"].asStringRef() == "int") && !p[it->first].isInt()){
p[it->first] = JSON::Value(p[it->first].asInt()).asString();
}
}
if (p[it->first].asStringRef().size() > 0){
argarr[argnum++] = (char*)(it->second["option"].c_str());
argarr[argnum++] = (char*)(p[it->first].c_str());
}
}
}
}
///\brief Handles requests by starting a corresponding output process.
///\param H The request to be handled
///\param conn The connection to the client that issued the request.
///\param connector The type of connector to be invoked.
void HTTPOutput::reConnector(std::string & connector){
//taken from CheckProtocols (controller_connectors.cpp)
char * argarr[20];
for (int i=0; i<20; i++){argarr[i] = 0;}
int id = -1;
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
IPC::sharedPage serverCfg("!mistConfig", 4*1024*1024);
DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
unsigned int prots_ctr = prots.getSize();
for (unsigned int i=0; i < prots_ctr; ++i){
if (prots.getIndice(i).getMember("connector").asString() == connector) {
id = i;
break; //pick the first protocol in the list that matches the connector
}
}
if (id == -1) {
DEBUG_MSG(DLVL_ERROR, "No connector found for: %s", connector.c_str());
configLock.post();
configLock.close();
return;
}
DEBUG_MSG(DLVL_HIGH, "Connector found: %s", connector.c_str());
//build arguments for starting output process
std::string temphost=myConn.getHost();
std::string debuglevel = JSON::Value((long long)Util::Config::printDebugLevel).asString();
std::string tmparg = Util::getMyPath() + std::string("MistOut") + connector;
int argnum = 0;
argarr[argnum++] = (char*)tmparg.c_str();
JSON::Value p = prots.getIndice(id).asJSON();
JSON::Value pipedCapa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(connector).asJSON();
configLock.post();
configLock.close();
argarr[argnum++] = (char*)"-i";
argarr[argnum++] = (char*)(temphost.c_str());
argarr[argnum++] = (char*)"-s";
argarr[argnum++] = (char*)(streamName.c_str());
//set the debug level if non-default
if (Util::Config::printDebugLevel != DEBUG){
argarr[argnum++] = (char*)"--debug";
argarr[argnum++] = (char*)(debuglevel.c_str());
}
if (pipedCapa.isMember("required")){builPipedPart(p, argarr, argnum, pipedCapa["required"]);}
if (pipedCapa.isMember("optional")){builPipedPart(p, argarr, argnum, pipedCapa["optional"]);}
///start new/better process
execv(argarr[0], argarr);
}
}

22
src/output/output_http.h Normal file
View file

@ -0,0 +1,22 @@
#include <mist/defines.h>
#include <mist/http_parser.h>
#include "output.h"
namespace Mist {
class HTTPOutput : public Output {
public:
HTTPOutput(Socket::Connection & conn);
virtual ~HTTPOutput(){};
static void init(Util::Config * cfg);
void onRequest();
void onFail();
virtual void onHTTP(){};
virtual void requestHandler();
static bool listenMode(){return false;}
void reConnector(std::string & connector);
std::string getHandler();
protected:
HTTP::Parser H;
};
}

View file

@ -0,0 +1,356 @@
#include <sys/stat.h>
#include "output_http_internal.h"
#include <mist/stream.h>
namespace Mist {
OutHTTP::OutHTTP(Socket::Connection & conn) : HTTPOutput(conn){
if (myConn.getSocket() >= 0){
std::string host = myConn.getHost();
dup2(myConn.getSocket(), STDIN_FILENO);
dup2(myConn.getSocket(), STDOUT_FILENO);
myConn.drop();
myConn = Socket::Connection(fileno(stdout),fileno(stdin) );
myConn.setHost(host);
}
}
OutHTTP::~OutHTTP() {}
bool OutHTTP::listenMode(){
INFO_MSG("Listen mode: %s", config->getString("ip").c_str());
return !(config->getString("ip").size());
}
void OutHTTP::init(Util::Config * cfg){
HTTPOutput::init(cfg);
capa.removeMember("deps");
capa["name"] = "HTTP";
capa["desc"] = "Generic HTTP handler, required for all other HTTP-based outputs.";
capa["url_match"].append("/crossdomain.xml");
capa["url_match"].append("/clientaccesspolicy.xml");
capa["url_match"].append("/$.html");
capa["url_match"].append("/$.ico");
capa["url_match"].append("/info_$.js");
capa["url_match"].append("/embed_$.js");
cfg->addConnectorOptions(8080, capa);
}
/// Sorts the JSON::Value objects that hold source information by preference.
struct sourceCompare {
bool operator() (const JSON::Value& lhs, const JSON::Value& rhs) const {
//first compare simultaneous tracks
if (lhs["simul_tracks"].asInt() > rhs["simul_tracks"].asInt()){
//more tracks = higher priority = true.
return true;
}
if (lhs["simul_tracks"].asInt() < rhs["simul_tracks"].asInt()){
//less tracks = lower priority = false
return false;
}
//same amount of tracks - compare "hardcoded" priorities
if (lhs["priority"].asInt() > rhs["priority"].asInt()){
//higher priority = true.
return true;
}
if (lhs["priority"].asInt() < rhs["priority"].asInt()){
//lower priority = false
return false;
}
//same priority - compare total matches
if (lhs["total_matches"].asInt() > rhs["total_matches"].asInt()){
//more matches = higher priority = true.
return true;
}
if (lhs["total_matches"].asInt() < rhs["total_matches"].asInt()){
//less matches = lower priority = false
return false;
}
//also same amount of matches? just compare the URL then.
return lhs["url"].asStringRef() < rhs["url"].asStringRef();
}
};
void addSource(const std::string & rel, std::set<JSON::Value, sourceCompare> & sources, std::string & host, const std::string & port, JSON::Value & conncapa, unsigned int most_simul, unsigned int total_matches){
JSON::Value tmp;
tmp["type"] = conncapa["type"];
tmp["relurl"] = rel;
tmp["priority"] = conncapa["priority"];
tmp["simul_tracks"] = most_simul;
tmp["total_matches"] = total_matches;
tmp["url"] = conncapa["handler"].asStringRef() + "://" + host + ":" + port + rel;
sources.insert(tmp);
}
void addSources(std::string & streamname, const std::string & rel, std::set<JSON::Value, sourceCompare> & sources, std::string & host, const std::string & port, JSON::Value & conncapa, JSON::Value & strmMeta){
unsigned int most_simul = 0;
unsigned int total_matches = 0;
if (conncapa.isMember("codecs") && conncapa["codecs"].size() > 0){
for (JSON::ArrIter it = conncapa["codecs"].ArrBegin(); it != conncapa["codecs"].ArrEnd(); it++){
unsigned int simul = 0;
if ((*it).size() > 0){
for (JSON::ArrIter itb = (*it).ArrBegin(); itb != (*it).ArrEnd(); itb++){
unsigned int matches = 0;
if ((*itb).size() > 0){
for (JSON::ArrIter itc = (*itb).ArrBegin(); itc != (*itb).ArrEnd(); itc++){
for (JSON::ObjIter trit = strmMeta["tracks"].ObjBegin(); trit != strmMeta["tracks"].ObjEnd(); trit++){
if (trit->second["codec"].asStringRef() == (*itc).asStringRef()){
matches++;
total_matches++;
}
}
}
}
if (matches){
simul++;
}
}
}
if (simul > most_simul){
most_simul = simul;
}
}
}
if (conncapa.isMember("methods") && conncapa["methods"].size() > 0){
std::string relurl;
size_t found = rel.find('$');
if (found != std::string::npos){
relurl = rel.substr(0, found) + streamname + rel.substr(found+1);
}else{
relurl = "/";
}
for (JSON::ArrIter it = conncapa["methods"].ArrBegin(); it != conncapa["methods"].ArrEnd(); it++){
if (!strmMeta.isMember("live") || !it->isMember("nolive")){
addSource(relurl, sources, host, port, *it, most_simul, total_matches);
}
}
}
}
void OutHTTP::onHTTP(){
if (H.url == "/crossdomain.xml"){
H.Clean();
H.SetHeader("Content-Type", "text/xml");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
H.SetBody("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><allow-access-from domain=\"*\" /><site-control permitted-cross-domain-policies=\"all\"/></cross-domain-policy>");
H.SendResponse("200", "OK", myConn);
return;
} //crossdomain.xml
if (H.url == "/clientaccesspolicy.xml"){
H.Clean();
H.SetHeader("Content-Type", "text/xml");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
H.SetBody("<?xml version=\"1.0\" encoding=\"utf-8\"?><access-policy><cross-domain-access><policy><allow-from http-methods=\"*\" http-request-headers=\"*\"><domain uri=\"*\"/></allow-from><grant-to><resource path=\"/\" include-subpaths=\"true\"/></grant-to></policy></cross-domain-access></access-policy>");
H.SendResponse("200", "OK", myConn);
return;
} //clientaccesspolicy.xml
// send logo icon
if (H.url.length() > 4 && H.url.substr(H.url.length() - 4, 4) == ".ico"){
H.Clean();
#include "../icon.h"
H.SetHeader("Content-Type", "image/x-icon");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
H.SetHeader("Content-Length", icon_len);
H.SendResponse("200", "OK", myConn);
myConn.SendNow((const char*)icon_data, icon_len);
return;
}
// send logo icon
if (H.url.length() > 6 && H.url.substr(H.url.length() - 5, 5) == ".html"){
H.Clean();
H.SetHeader("Content-Type", "text/html");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
H.SetBody("<!DOCTYPE html><html><head><title>Stream "+streamName+"</title><style>BODY{color:white;background:black;}</style></head><body><script src=\"embed_"+streamName+".js\"></script></body></html>");
H.SendResponse("200", "OK", myConn);
return;
}
// send smil MBR index
if (H.url.length() > 6 && H.url.substr(H.url.length() - 5, 5) == ".smil"){
std::string host = H.GetHeader("Host");
if (host.find(':')){
host.resize(host.find(':'));
}
std::string port, url_rel;
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
IPC::sharedPage serverCfg("!mistConfig", 4*1024*1024);
DTSC::Scan prtcls = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember("RTMP");
unsigned int pro_cnt = prtcls.getSize();
for (unsigned int i = 0; i < pro_cnt; ++i){
if (prtcls.getIndice(i).getMember("connector").asString() != "RTMP"){
continue;
}
port = prtcls.getIndice(i).getMember("port").asString();
//get the default port if none is set
if (!port.size()){
port = capa.getMember("optional").getMember("port").getMember("default").asString();
}
//extract url
url_rel = capa.getMember("url_rel").asString();
if (url_rel.find('$')){
url_rel.resize(url_rel.find('$'));
}
}
std::string trackSources;//this string contains all track sources for MBR smil
DTSC::Scan tracks = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(streamName).getMember("meta").getMember("tracks");
unsigned int track_ctr = tracks.getSize();
for (unsigned int i = 0; i < track_ctr; ++i){//for all video tracks
DTSC::Scan trk = tracks.getIndice(i);
if (trk.getMember("type").asString() == "video"){
trackSources += " <video src='"+ streamName + "?track=" + trk.getMember("trackid").asString() + "' height='" + trk.getMember("height").asString() + "' system-bitrate='" + trk.getMember("bps").asString() + "' width='" + trk.getMember("width").asString() + "' />\n";
}
}
configLock.post();
configLock.close();
H.Clean();
H.SetHeader("Content-Type", "application/smil");
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.SetBody("<smil>\n <head>\n <meta base='rtmp://" + host + ":" + port + url_rel + "' />\n </head>\n <body>\n <switch>\n"+trackSources+" </switch>\n </body>\n</smil>");
H.SendResponse("200", "OK", myConn);
return;
}
if ((H.url.length() > 9 && H.url.substr(0, 6) == "/info_" && H.url.substr(H.url.length() - 3, 3) == ".js") || (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js")){
std::string response;
std::string host = H.GetHeader("Host");
if (host.find(':') != std::string::npos){
host.resize(host.find(':'));
}
H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
H.SetHeader("Content-Type", "application/javascript");
response = "// Generating info code for stream " + streamName + "\n\nif (!mistvideo){var mistvideo = {};}\n";
JSON::Value json_resp;
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
IPC::semaphore metaLocker(std::string("liveMeta@" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
bool metaLock = false;
configLock.wait();
IPC::sharedPage serverCfg("!mistConfig", 4*1024*1024);
DTSC::Scan strm = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(streamName).getMember("meta");
IPC::sharedPage streamIndex;
if (!strm){
configLock.post();
//Stream metadata not found - attempt to start it
if (Util::startInput(streamName)){
streamIndex.init(streamName, 8 * 1024 * 1024);
if (streamIndex.mapped){
metaLock = true;
metaLocker.wait();
strm = DTSC::Packet(streamIndex.mapped, streamIndex.len, true).getScan();
}
}
if (!strm){
//stream failed to start or isn't configured
response += "// Stream isn't configured and/or couldn't be started. Sorry.\n";
}
configLock.wait();
}
DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
if (strm && prots){
DTSC::Scan trcks = strm.getMember("tracks");
unsigned int trcks_ctr = trcks.getSize();
for (unsigned int i = 0; i < trcks_ctr; ++i){
if (trcks.getIndice(i).getMember("width").asInt() > json_resp["width"].asInt()){
json_resp["width"] = trcks.getIndice(i).getMember("width").asInt();
}
if (trcks.getIndice(i).getMember("height").asInt() > json_resp["height"].asInt()){
json_resp["height"] = trcks.getIndice(i).getMember("height").asInt();
}
}
if (json_resp["width"].asInt() < 1 || json_resp["height"].asInt() < 1){
json_resp["width"] = 640ll;
json_resp["height"] = 480ll;
}
if (strm.getMember("vod")){
json_resp["type"] = "vod";
}
if (strm.getMember("live")){
json_resp["type"] = "live";
}
// show ALL the meta datas!
json_resp["meta"] = strm.asJSON();
for (JSON::ObjIter it = json_resp["meta"]["tracks"].ObjBegin(); it != json_resp["meta"]["tracks"].ObjEnd(); ++it){
it->second.removeMember("fragments");
it->second.removeMember("keys");
it->second.removeMember("parts");
}
//create a set for storing source information
std::set<JSON::Value, sourceCompare> sources;
//find out which connectors are enabled
std::set<std::string> conns;
unsigned int prots_ctr = prots.getSize();
for (unsigned int i = 0; i < prots_ctr; ++i){
conns.insert(prots.getIndice(i).getMember("connector").asString());
}
//loop over the connectors.
for (unsigned int i = 0; i < prots_ctr; ++i){
std::string cName = prots.getIndice(i).getMember("connector").asString();
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName);
//if the connector has a port,
if (capa.getMember("optional").getMember("port")){
//get the default port if none is set
std::string port = prots.getIndice(i).getMember("port").asString();
if (!port.size()){
port = capa.getMember("optional").getMember("port").getMember("default").asString();
}
//and a URL - then list the URL
if (capa.getMember("url_rel")){
JSON::Value capa_json = capa.asJSON();
addSources(streamName, capa.getMember("url_rel").asString(), sources, host, port, capa_json, json_resp["meta"]);
}
//check each enabled protocol separately to see if it depends on this connector
DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors");
unsigned int capa_lst_ctr = capa_lst.getSize();
for (unsigned int j = 0; j < capa_lst_ctr; ++j){
//if it depends on this connector and has a URL, list it
if (conns.count(capa_lst.getIndiceName(j)) && (capa_lst.getIndice(j).getMember("deps").asString() == cName || capa_lst.getIndice(j).getMember("deps").asString() + ".exe" == cName) && capa_lst.getIndice(j).getMember("methods")){
JSON::Value capa_json = capa_lst.getIndice(j).asJSON();
addSources(streamName, capa_lst.getIndice(j).getMember("url_rel").asString(), sources, host, port, capa_json, json_resp["meta"]);
}
}
}
}
//loop over the added sources, add them to json_resp["sources"]
for (std::set<JSON::Value, sourceCompare>::iterator it = sources.begin(); it != sources.end(); it++){
if ((*it)["simul_tracks"].asInt() > 0){
json_resp["source"].append(*it);
}
}
}else{
json_resp["error"] = "The specified stream is not available on this server.";
}
if (metaLock){
metaLocker.post();
}
configLock.post();
configLock.close();
#include "../embed.js.h"
response += "mistvideo['" + streamName + "'] = " + json_resp.toString() + ";\n";
if (H.url.substr(0, 6) != "/info_" && !json_resp.isMember("error")){
response.append("\n(");
if (embed_js[embed_js_len - 2] == ';'){//check if we have a trailing ;\n or just \n
response.append((char*)embed_js, (size_t)embed_js_len - 2); //remove trailing ";\n" from xxd conversion
}else{
response.append((char*)embed_js, (size_t)embed_js_len - 1); //remove trailing "\n" from xxd conversion
}
response.append("(\"" + streamName + "\"));\n");
}
H.SetBody(response);
H.SendResponse("200", "OK", myConn);
return;
} //embed code generator
}
}

View file

@ -0,0 +1,15 @@
#include "output_http.h"
namespace Mist {
class OutHTTP : public HTTPOutput {
public:
OutHTTP(Socket::Connection & conn);
~OutHTTP();
static void init(Util::Config * cfg);
static bool listenMode();
void onHTTP();
};
}
typedef Mist::OutHTTP mistOut;

View file

@ -5,27 +5,17 @@
#include <unistd.h>
namespace Mist {
OutHTTPTS::OutHTTPTS(Socket::Connection & conn) : Output(conn) {
OutHTTPTS::OutHTTPTS(Socket::Connection & conn) : HTTPOutput(conn) {
haveAvcc = false;
myConn.setHost(config->getString("ip"));
myConn.setBlocking(true);
streamName = config->getString("streamname");
}
OutHTTPTS::~OutHTTPTS() {}
void OutHTTPTS::onFail(){
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
HTTP_S.SetBody("Stream not found. Sorry, we tried.");
HTTP_S.SendResponse("404", "Stream not found", myConn);
Output::onFail();
}
void OutHTTPTS::init(Util::Config * cfg){
Output::init(cfg);
HTTPOutput::init(cfg);
capa["name"] = "HTTPTS";
capa["desc"] = "Enables HTTP protocol MPEG2/TS pseudostreaming.";
capa["deps"] = "HTTP";
capa["url_rel"] = "/$.ts";
capa["url_match"] = "/$.ts";
capa["socket"] = "http_ts";
@ -35,8 +25,6 @@ namespace Mist {
capa["methods"][0u]["handler"] = "http";
capa["methods"][0u]["type"] = "html5/video/mp2t";
capa["methods"][0u]["priority"] = 1ll;
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
///this function generates the PMT packet
@ -73,12 +61,12 @@ namespace Mist {
void OutHTTPTS::fillPacket(bool & first, const char * data, size_t dataLen, char & ContCounter){
if (!PackData.BytesFree()){
if (PacketNumber % 42 == 0){
HTTP_S.Chunkify(TS::PAT, 188, myConn);
H.Chunkify(TS::PAT, 188, myConn);
std::string PMT = createPMT();
HTTP_S.Chunkify(PMT, myConn);
H.Chunkify(PMT, myConn);
PacketNumber += 2;
}
HTTP_S.Chunkify(PackData.ToString(), 188, myConn);
H.Chunkify(PackData.ToString(), 188, myConn);
PacketNumber ++;
PackData.Clear();
}
@ -152,19 +140,15 @@ namespace Mist {
}
}
void OutHTTPTS::onRequest(){
while (HTTP_R.Read(myConn)){
std::string ua = HTTP_R.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
DEBUG_MSG(DLVL_MEDIUM, "Received request: %s", HTTP_R.getUrl().c_str());
initialize();
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp2t");
HTTP_S.StartResponse(HTTP_R, myConn);
PacketNumber = 0;
parseData = true;
wantRequest = false;
HTTP_R.Clean(); //clean for any possible next requests
}
void OutHTTPTS::onHTTP(){
initialize();
H.Clean();
H.SetHeader("Content-Type", "video/mp2t");
H.StartResponse(H, myConn);
PacketNumber = 0;
parseData = true;
wantRequest = false;
H.Clean(); //clean for any possible next requests
}
}

View file

@ -1,21 +1,18 @@
#include "output.h"
#include "output_http.h"
#include <mist/http_parser.h>
#include <mist/ts_packet.h>
#include <mist/mp4.h>
#include <mist/mp4_generic.h>
namespace Mist {
class OutHTTPTS : public Output {
class OutHTTPTS : public HTTPOutput {
public:
OutHTTPTS(Socket::Connection & conn);
~OutHTTPTS();
static void init(Util::Config * cfg);
void onRequest();
void onFail();
void onHTTP();
void sendNext();
protected:
HTTP::Parser HTTP_S;
HTTP::Parser HTTP_R;
std::string createPMT();
void fillPacket(bool & first, const char * data, size_t dataLen, char & ContCounter);
int keysToSend;

View file

@ -1,30 +1,18 @@
#include "output_json.h"
#include <mist/http_parser.h>
#include <mist/defines.h>
#include <mist/checksum.h>
#include <iomanip>
namespace Mist {
OutJSON::OutJSON(Socket::Connection & conn) : Output(conn){
realTime = 0;
myConn.setHost(config->getString("ip"));
streamName = config->getString("streamname");
}
OutJSON::OutJSON(Socket::Connection & conn) : HTTPOutput(conn){realTime = 0;}
OutJSON::~OutJSON() {}
void OutJSON::init(Util::Config * cfg){
Output::init(cfg);
HTTPOutput::init(cfg);
capa["name"] = "JSON";
capa["desc"] = "Enables HTTP protocol JSON streaming.";
capa["deps"] = "HTTP";
capa["url_rel"] = "/$.json";
capa["url_match"] = "/$.json";
capa["url_handler"] = "http";
capa["url_type"] = "json";
capa["socket"] = "http_json";
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
void OutJSON::sendNext(){
@ -42,11 +30,9 @@ namespace Mist {
}
void OutJSON::sendHeader(){
HTTP::Parser HTTP_S;
FLV::Tag tag;
HTTP_S.SetHeader("Content-Type", "text/javascript");
HTTP_S.protocol = "HTTP/1.0";
myConn.SendNow(HTTP_S.BuildResponse("200", "OK"));
H.SetHeader("Content-Type", "text/javascript");
H.protocol = "HTTP/1.0";
H.SendResponse("200", "OK", myConn);
sentHeader = true;
}
@ -59,31 +45,20 @@ namespace Mist {
return false;
}
void OutJSON::onRequest(){
HTTP::Parser HTTP_R;
while (HTTP_R.Read(myConn)){
std::string ua = HTTP_R.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
DEBUG_MSG(DLVL_DEVEL, "Received request %s", HTTP_R.getUrl().c_str());
first = true;
jsonp = "";
if (HTTP_R.GetVar("callback") != ""){
jsonp = HTTP_R.GetVar("callback");
void OutJSON::onHTTP(){
first = true;
jsonp = "";
if (H.GetVar("callback") != ""){jsonp = H.GetVar("callback");}
if (H.GetVar("jsonp") != ""){jsonp = H.GetVar("jsonp");}
initialize();
for (std::map<int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.type == "meta" ){
selectedTracks.insert(it->first);
}
if (HTTP_R.GetVar("jsonp") != ""){
jsonp = HTTP_R.GetVar("jsonp");
}
initialize();
for (std::map<int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.type == "meta" ){
selectedTracks.insert(it->first);
}
}
seek(0);
parseData = true;
wantRequest = false;
HTTP_R.Clean();
}
seek(0);
parseData = true;
wantRequest = false;
}
}

View file

@ -1,13 +1,13 @@
#include "output.h"
#include "output_http.h"
namespace Mist {
class OutJSON : public Output {
class OutJSON : public HTTPOutput {
public:
OutJSON(Socket::Connection & conn);
~OutJSON();
static void init(Util::Config * cfg);
void onRequest();
void onHTTP();
bool onFinish();
void sendNext();
void sendHeader();

View file

@ -1,24 +1,15 @@
#include "output_progressive_flv.h"
#include <mist/checksum.h>
#include <mist/http_parser.h>
#include <mist/defines.h>
namespace Mist {
OutProgressiveFLV::OutProgressiveFLV(Socket::Connection & conn) : Output(conn) {
myConn.setHost(config->getString("ip"));
streamName = config->getString("streamname");
}
OutProgressiveFLV::OutProgressiveFLV(Socket::Connection & conn) : HTTPOutput(conn){}
OutProgressiveFLV::~OutProgressiveFLV() {}
void OutProgressiveFLV::init(Util::Config * cfg){
Output::init(cfg);
HTTPOutput::init(cfg);
capa["name"] = "FLV";
capa["desc"] = "Enables HTTP protocol progressive streaming.";
capa["deps"] = "HTTP";
capa["url_rel"] = "/$.flv";
capa["url_match"] = "/$.flv";
capa["socket"] = "http_progressive_flv";
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("H263");
capa["codecs"][0u][0u].append("VP6");
@ -37,69 +28,34 @@ namespace Mist {
capa["methods"][0u]["handler"] = "http";
capa["methods"][0u]["type"] = "flash/7";
capa["methods"][0u]["priority"] = 5ll;
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
void OutProgressiveFLV::sendNext(){
FLV::Tag tag;
bool tmp = tag.DTSCLoader(currentPacket, myMeta.tracks[currentPacket.getTrackId()]);
if (!tmp){
DEBUG_MSG(DLVL_DEVEL, "Invalid JSON");
}
tag.DTSCLoader(currentPacket, myMeta.tracks[currentPacket.getTrackId()]);
myConn.SendNow(tag.data, tag.len);
}
void OutProgressiveFLV::sendHeader(){
HTTP::Parser HTTP_S;
FLV::Tag tag;
HTTP_S.SetHeader("Content-Type", "video/x-flv");
HTTP_S.protocol = "HTTP/1.0";
myConn.SendNow(HTTP_S.BuildResponse("200", "OK"));
H.Clean();
H.SetHeader("Content-Type", "video/x-flv");
H.protocol = "HTTP/1.0";
H.SendResponse("200", "OK", myConn);
myConn.SendNow(FLV::Header, 13);
tag.DTSCMetaInit(myMeta, selectedTracks);
myConn.SendNow(tag.data, tag.len);
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks[*it].type == "video"){
if (tag.DTSCVideoInit(myMeta.tracks[*it])){
myConn.SendNow(tag.data, tag.len);
}
if (myMeta.tracks[*it].type == "video" && tag.DTSCVideoInit(myMeta.tracks[*it])){
myConn.SendNow(tag.data, tag.len);
}
if (myMeta.tracks[*it].type == "audio"){
if (tag.DTSCAudioInit(myMeta.tracks[*it])){
myConn.SendNow(tag.data, tag.len);
}
if (myMeta.tracks[*it].type == "audio" && tag.DTSCAudioInit(myMeta.tracks[*it])){
myConn.SendNow(tag.data, tag.len);
}
}
sentHeader = true;
}
void OutProgressiveFLV::onFail(){
HTTP::Parser HTTP_S;
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
HTTP_S.SetBody("Stream not found. Sorry, we tried.");
HTTP_S.SendResponse("404", "Stream not found", myConn);
Output::onFail();
void OutProgressiveFLV::onHTTP(){
parseData = true;
wantRequest = false;
}
void OutProgressiveFLV::onRequest(){
HTTP::Parser HTTP_R;
while (HTTP_R.Read(myConn)){
std::string ua = HTTP_R.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
DEBUG_MSG(DLVL_DEVEL, "Received request %s", HTTP_R.getUrl().c_str());
if (HTTP_R.GetVar("audio") != ""){
selectedTracks.insert(JSON::Value(HTTP_R.GetVar("audio")).asInt());
}
if (HTTP_R.GetVar("video") != ""){
selectedTracks.insert(JSON::Value(HTTP_R.GetVar("video")).asInt());
}
parseData = true;
wantRequest = false;
HTTP_R.Clean();
}
}
}

View file

@ -1,17 +1,17 @@
#include "output.h"
#include "output_http.h"
namespace Mist {
class OutProgressiveFLV : public Output {
class OutProgressiveFLV : public HTTPOutput {
public:
OutProgressiveFLV(Socket::Connection & conn);
~OutProgressiveFLV();
static void init(Util::Config * cfg);
void onRequest();
void onHTTP();
void sendNext();
void onFail();
void sendHeader();
protected:
private:
FLV::Tag tag;
};
}

View file

@ -1,31 +1,19 @@
#include "output_progressive_mp3.h"
#include <mist/http_parser.h>
#include <mist/defines.h>
#include <mist/checksum.h>
namespace Mist {
OutProgressiveMP3::OutProgressiveMP3(Socket::Connection & conn) : Output(conn) {
myConn.setHost(config->getString("ip"));
streamName = config->getString("streamname");
}
OutProgressiveMP3::~OutProgressiveMP3() {}
OutProgressiveMP3::OutProgressiveMP3(Socket::Connection & conn) : HTTPOutput(conn){}
OutProgressiveMP3::~OutProgressiveMP3(){}
void OutProgressiveMP3::init(Util::Config * cfg){
Output::init(cfg);
HTTPOutput::init(cfg);
capa["name"] = "MP3";
capa["desc"] = "Enables HTTP protocol progressive streaming.";
capa["deps"] = "HTTP";
capa["url_rel"] = "/$.mp3";
capa["url_match"] = "/$.mp3";
capa["socket"] = "http_progressive_mp3";
capa["codecs"][0u][0u].append("MP3");
capa["methods"][0u]["handler"] = "http";
capa["methods"][0u]["type"] = "html5/audio/mp3";
capa["methods"][0u]["priority"] = 8ll;
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
void OutProgressiveMP3::sendNext(){
@ -36,35 +24,15 @@ namespace Mist {
}
void OutProgressiveMP3::sendHeader(){
HTTP::Parser HTTP_S;
FLV::Tag tag;
HTTP_S.SetHeader("Content-Type", "audio/mpeg");
HTTP_S.protocol = "HTTP/1.0";
myConn.SendNow(HTTP_S.BuildResponse("200", "OK"));
H.SetHeader("Content-Type", "audio/mpeg");
H.protocol = "HTTP/1.0";
H.SendResponse("200", "OK", myConn);
sentHeader = true;
}
void OutProgressiveMP3::onFail(){
HTTP::Parser HTTP_S;
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
HTTP_S.SetBody("Stream not found. Sorry, we tried.");
HTTP_S.SendResponse("404", "Stream not found", myConn);
Output::onFail();
}
void OutProgressiveMP3::onRequest(){
HTTP::Parser HTTP_R;
while (HTTP_R.Read(myConn)){
std::string ua = HTTP_R.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
DEBUG_MSG(DLVL_DEVEL, "Received request %s", HTTP_R.getUrl().c_str());
if (HTTP_R.GetVar("audio") != ""){
selectedTracks.insert(JSON::Value(HTTP_R.GetVar("audio")).asInt());
}
parseData = true;
wantRequest = false;
HTTP_R.Clean();
}
void OutProgressiveMP3::onHTTP(){
parseData = true;
wantRequest = false;
}
}

View file

@ -1,17 +1,15 @@
#include "output.h"
#include "output_http.h"
namespace Mist {
class OutProgressiveMP3 : public Output {
class OutProgressiveMP3 : public HTTPOutput {
public:
OutProgressiveMP3(Socket::Connection & conn);
~OutProgressiveMP3();
static void init(Util::Config * cfg);
void onRequest();
void onHTTP();
void sendNext();
void onFail();
void sendHeader();
protected:
};
}

View file

@ -5,21 +5,15 @@
#include <mist/checksum.h>
namespace Mist {
OutProgressiveMP4::OutProgressiveMP4(Socket::Connection & conn) : Output(conn) {
myConn.setHost(config->getString("ip"));
streamName = config->getString("streamname");
}
OutProgressiveMP4::OutProgressiveMP4(Socket::Connection & conn) : HTTPOutput(conn){}
OutProgressiveMP4::~OutProgressiveMP4() {}
void OutProgressiveMP4::init(Util::Config * cfg){
Output::init(cfg);
HTTPOutput::init(cfg);
capa["name"] = "MP4";
capa["desc"] = "Enables HTTP protocol progressive streaming.";
capa["deps"] = "HTTP";
capa["url_rel"] = "/$.mp4";
capa["url_match"] = "/$.mp4";
capa["socket"] = "http_progressive_mp4";
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][1u].append("AAC");
capa["codecs"][0u][1u].append("MP3");
@ -27,10 +21,6 @@ namespace Mist {
capa["methods"][0u]["type"] = "html5/video/mp4";
capa["methods"][0u]["priority"] = 8ll;
capa["methods"][0u]["nolive"] = 1;
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
std::string OutProgressiveMP4::DTSCMeta2MP4Header(long long & size){
@ -420,39 +410,75 @@ namespace Mist {
}
}
void OutProgressiveMP4::onRequest(){
if (HTTP_R.Read(myConn)){
std::string ua = HTTP_R.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
DEBUG_MSG(DLVL_MEDIUM, "Received request: %s", HTTP_R.getUrl().c_str());
if (HTTP_R.GetVar("audio") != ""){
selectedTracks.insert(JSON::Value(HTTP_R.GetVar("audio")).asInt());
}
if (HTTP_R.GetVar("video") != ""){
selectedTracks.insert(JSON::Value(HTTP_R.GetVar("video")).asInt());
}
parseData = true;
wantRequest = false;
sentHeader = false;
void OutProgressiveMP4::onHTTP(){
initialize();
parseData = true;
wantRequest = false;
sentHeader = false;
fileSize = 0;
std::string headerData = DTSCMeta2MP4Header(fileSize);
byteStart = 0;
byteEnd = fileSize - 1;
seekPoint = 0;
char rangeType = ' ';
currPos = 0;
sortSet.clear();
for (std::set<long unsigned int>::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) {
keyPart temp;
temp.trackID = *subIt;
temp.time = myMeta.tracks[*subIt].firstms;//timeplace of frame
temp.endTime = myMeta.tracks[*subIt].firstms + myMeta.tracks[*subIt].parts[0].getDuration();
temp.size = myMeta.tracks[*subIt].parts[0].getSize();//bytesize of frame (alle parts all together)
temp.index = 0;
sortSet.insert(temp);
}
}
/*
bool OutProgressiveMP4::onFinish(){
//HTTP_S.Chunkify("", myConn);
HTTP_R.Clean();
parseData = false;
wantRequest = true;
return true;
}
*/
void OutProgressiveMP4::onFail(){
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
HTTP_S.SetBody("Stream not found. Sorry, we tried.");
HTTP_S.SendResponse("404", "Stream not found", myConn);
Output::onFail();
if (H.GetHeader("Range") != ""){
parseRange(H.GetHeader("Range"), byteStart, byteEnd, seekPoint, headerData.size());
rangeType = H.GetHeader("Range")[0];
}
H.Clean(); //make sure no parts of old requests are left in any buffers
H.SetHeader("Content-Type", "video/MP4"); //Send the correct content-type for MP4 files
H.SetHeader("Accept-Ranges", "bytes, parsec");
if (rangeType != ' '){
if (!byteEnd){
if (rangeType == 'p'){
H.SetBody("Starsystem not in communications range");
H.SendResponse("416", "Starsystem not in communications range", myConn);
return;
}else{
H.SetBody("Requested Range Not Satisfiable");
H.SendResponse("416", "Requested Range Not Satisfiable", myConn);
return;
}
}else{
std::stringstream rangeReply;
rangeReply << "bytes " << byteStart << "-" << byteEnd << "/" << fileSize;
H.SetHeader("Content-Length", byteEnd - byteStart + 1);
//do not multiplex requests that are > 1MiB
if (byteEnd - byteStart + 1 > 1024*1024){
H.SetHeader("MistMultiplex", "No");
}
H.SetHeader("Content-Range", rangeReply.str());
/// \todo Switch to chunked?
H.SendResponse("206", "Partial content", myConn);
//H.StartResponse("206", "Partial content", HTTP_R, conn);
}
}else{
H.SetHeader("Content-Length", byteEnd - byteStart + 1);
//do not multiplex requests that aren't ranged
H.SetHeader("MistMultiplex", "No");
/// \todo Switch to chunked?
H.SendResponse("200", "OK", myConn);
//HTTP_S.StartResponse(HTTP_R, conn);
}
leftOver = byteEnd - byteStart + 1;//add one byte, because range "0-0" = 1 byte of data
if (byteStart < (long long)headerData.size()){
/// \todo Switch to chunked?
//H.Chunkify(headerData.data()+byteStart, std::min((long long)headerData.size(), byteEnd) - byteStart, conn);//send MP4 header
myConn.SendNow(headerData.data()+byteStart, std::min((long long)headerData.size(), byteEnd) - byteStart);//send MP4 header
leftOver -= std::min((long long)headerData.size(), byteEnd) - byteStart;
}
currPos += headerData.size();//we're now guaranteed to be past the header point, no matter what
}
void OutProgressiveMP4::sendNext(){
@ -462,7 +488,7 @@ namespace Mist {
currentPacket.getString("data", dataPointer, len);
if ((unsigned long)currentPacket.getTrackId() != sortSet.begin()->trackID || currentPacket.getTime() != sortSet.begin()->time){
if (perfect){
DEBUG_MSG(DLVL_WARN, "Warning: input is inconsistent, playback may not be perfect");
DEBUG_MSG(DLVL_WARN, "Warning: input is inconsistent. Expected %lu:%llu but got %ld:%llu", sortSet.begin()->trackID, sortSet.begin()->time, currentPacket.getTrackId(), currentPacket.getTime());
perfect = false;
}
}
@ -485,7 +511,7 @@ namespace Mist {
if (currPos >= byteStart){
myConn.SendNow(dataPointer, std::min(leftOver, (long long)len));
//HTTP_S.Chunkify(Strm.lastData().data(), Strm.lastData().size(), conn);
//H.Chunkify(Strm.lastData().data(), Strm.lastData().size(), conn);
leftOver -= len;
}else{
if (currPos + (long long)len > byteStart){
@ -503,70 +529,6 @@ namespace Mist {
}
void OutProgressiveMP4::sendHeader(){
fileSize = 0;
std::string headerData = DTSCMeta2MP4Header(fileSize);
byteStart = 0;
byteEnd = fileSize - 1;
long long seekPoint = 0;
char rangeType = ' ';
currPos = 0;
sortSet.clear();
for (std::set<long unsigned int>::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) {
keyPart temp;
temp.trackID = *subIt;
temp.time = myMeta.tracks[*subIt].firstms;//timeplace of frame
temp.endTime = myMeta.tracks[*subIt].firstms + myMeta.tracks[*subIt].parts[0].getDuration();
temp.size = myMeta.tracks[*subIt].parts[0].getSize();//bytesize of frame (alle parts all together)
temp.index = 0;
sortSet.insert(temp);
}
if (HTTP_R.GetHeader("Range") != ""){
parseRange(HTTP_R.GetHeader("Range"), byteStart, byteEnd, seekPoint, headerData.size());
rangeType = HTTP_R.GetHeader("Range")[0];
}
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
HTTP_S.SetHeader("Content-Type", "video/MP4"); //Send the correct content-type for MP4 files
HTTP_S.SetHeader("Accept-Ranges", "bytes, parsec");
if (rangeType != ' '){
if (!byteEnd){
if (rangeType == 'p'){
HTTP_S.SetBody("Starsystem not in communications range");
HTTP_S.SendResponse("416", "Starsystem not in communications range", myConn);
return;
}else{
HTTP_S.SetBody("Requested Range Not Satisfiable");
HTTP_S.SendResponse("416", "Requested Range Not Satisfiable", myConn);
return;
}
}else{
std::stringstream rangeReply;
rangeReply << "bytes " << byteStart << "-" << byteEnd << "/" << fileSize;
HTTP_S.SetHeader("Content-Length", byteEnd - byteStart + 1);
//do not multiplex requests that are > 1MiB
if (byteEnd - byteStart + 1 > 1024*1024){
HTTP_S.SetHeader("MistMultiplex", "No");
}
HTTP_S.SetHeader("Content-Range", rangeReply.str());
/// \todo Switch to chunked?
HTTP_S.SendResponse("206", "Partial content", myConn);
//HTTP_S.StartResponse("206", "Partial content", HTTP_R, conn);
}
}else{
HTTP_S.SetHeader("Content-Length", byteEnd - byteStart + 1);
//do not multiplex requests that aren't ranged
HTTP_S.SetHeader("MistMultiplex", "No");
/// \todo Switch to chunked?
HTTP_S.SendResponse("200", "OK", myConn);
//HTTP_S.StartResponse(HTTP_R, conn);
}
leftOver = byteEnd - byteStart + 1;//add one byte, because range "0-0" = 1 byte of data
if (byteStart < (long long)headerData.size()){
/// \todo Switch to chunked?
//HTTP_S.Chunkify(headerData.data()+byteStart, std::min((long long)headerData.size(), byteEnd) - byteStart, conn);//send MP4 header
myConn.SendNow(headerData.data()+byteStart, std::min((long long)headerData.size(), byteEnd) - byteStart);//send MP4 header
leftOver -= std::min((long long)headerData.size(), byteEnd) - byteStart;
}
currPos += headerData.size();//we're now guaranteed to be past the header point, no matter what
seek(seekPoint);
sentHeader = true;
}

View file

@ -1,4 +1,4 @@
#include "output.h"
#include "output_http.h"
#include <mist/http_parser.h>
namespace Mist {
@ -22,7 +22,7 @@ namespace Mist {
long unsigned int index;
};
class OutProgressiveMP4 : public Output {
class OutProgressiveMP4 : public HTTPOutput {
public:
OutProgressiveMP4(Socket::Connection & conn);
~OutProgressiveMP4();
@ -30,20 +30,17 @@ namespace Mist {
void parseRange(std::string header, long long & byteStart, long long & byteEnd, long long & seekPoint, unsigned int headerSize);
std::string DTSCMeta2MP4Header(long long & size);
void findSeekPoint(long long byteStart, long long & seekPoint, unsigned int headerSize);
void onRequest();
void onHTTP();
void sendNext();
//bool onFinish();
void sendHeader();
void onFail();
protected:
long long fileSize;
long long byteStart;
long long byteEnd;
long long leftOver;
long long currPos;
long long seekPoint;
std::set <keyPart> sortSet;//filling sortset for interleaving parts
HTTP::Parser HTTP_R, HTTP_S;
};
}

View file

@ -5,41 +5,21 @@
#include <iomanip>
namespace Mist {
OutProgressiveSRT::OutProgressiveSRT(Socket::Connection & conn) : Output(conn) {
realTime = 0;
myConn.setHost(config->getString("ip"));
streamName = config->getString("streamname");
}
void OutProgressiveSRT::onFail(){
HTTP::Parser HTTP_S;
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
HTTP_S.SetBody("Stream not found. Sorry, we tried.");
HTTP_S.SendResponse("404", "Stream not found", myConn);
Output::onFail();
}
OutProgressiveSRT::OutProgressiveSRT(Socket::Connection & conn) : HTTPOutput(conn){realTime = 0;}
OutProgressiveSRT::~OutProgressiveSRT() {}
void OutProgressiveSRT::init(Util::Config * cfg){
Output::init(cfg);
HTTPOutput::init(cfg);
capa["name"] = "SRT";
capa["desc"] = "Enables HTTP protocol subtitle streaming.";
capa["deps"] = "HTTP";
capa["url_rel"] = "/$.srt";
capa["url_match"] = "/$.srt";
capa["url_handler"] = "http";
capa["url_type"] = "subtitle";
capa["socket"] = "http_srt";
capa["codecs"][0u][0u].append("srt");
capa["methods"][0u]["handler"] = "http";
capa["methods"][0u]["type"] = "html5/text/plain";
capa["methods"][0u]["priority"] = 8ll;
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
void OutProgressiveSRT::sendNext(){
@ -65,29 +45,20 @@ namespace Mist {
}
void OutProgressiveSRT::sendHeader(){
HTTP::Parser HTTP_S;
FLV::Tag tag;
HTTP_S.SetHeader("Content-Type", "text/plain");
HTTP_S.protocol = "HTTP/1.0";
myConn.SendNow(HTTP_S.BuildResponse("200", "OK"));
H.SetHeader("Content-Type", "text/plain");
H.protocol = "HTTP/1.0";
H.SendResponse("200", "OK", myConn);
sentHeader = true;
}
void OutProgressiveSRT::onRequest(){
HTTP::Parser HTTP_R;
while (HTTP_R.Read(myConn)){
std::string ua = HTTP_R.GetHeader("User-Agent");
crc = checksum::crc32(0, ua.data(), ua.size());
DEBUG_MSG(DLVL_DEVEL, "Received request %s", HTTP_R.getUrl().c_str());
lastNum = 0;
webVTT = (HTTP_R.url.find(".webvtt") != std::string::npos);
if (HTTP_R.GetVar("track") != ""){
selectedTracks.insert(JSON::Value(HTTP_R.GetVar("track")).asInt());
}
parseData = true;
wantRequest = false;
HTTP_R.Clean();
void OutProgressiveSRT::onHTTP(){
lastNum = 0;
webVTT = (H.url.find(".webvtt") != std::string::npos);
if (H.GetVar("track") != ""){
selectedTracks.insert(JSON::Value(H.GetVar("track")).asInt());
}
parseData = true;
wantRequest = false;
}
}

View file

@ -1,15 +1,14 @@
#include "output.h"
#include "output_http.h"
namespace Mist {
class OutProgressiveSRT : public Output {
class OutProgressiveSRT : public HTTPOutput {
public:
OutProgressiveSRT(Socket::Connection & conn);
~OutProgressiveSRT();
static void init(Util::Config * cfg);
void onRequest();
void onHTTP();
void sendNext();
void onFail();
void sendHeader();
protected:
bool webVTT;