/// \file conn_http.cpp /// Contains the main code for the HTTP Connector #include #include #include #include #include #include #include #include #include #include #include #include // #include #include #include #include #include #include #include #include #include #include #include "embed.js.h" /// Holds everything unique to HTTP Connectors. namespace Connector_HTTP { static inline void builPipedPart(JSON::Value & p, char * argarr[], int & argnum, JSON::Value & argset){ for (JSON::ObjIter it = argset.ObjBegin(); it != argset.ObjEnd(); ++it){ if (it->second.isMember("option") && p.isMember(it->first)){ if (it->second.isMember("type")){ if (it->second["type"].asStringRef() == "str" && !p[it->first].isString()){ p[it->first] = p[it->first].asString(); } if ((it->second["type"].asStringRef() == "uint" || it->second["type"].asStringRef() == "int") && !p[it->first].isInt()){ p[it->first] = JSON::Value(p[it->first].asInt()).asString(); } } if (p[it->first].asStringRef().size() > 0){ argarr[argnum++] = (char*)(it->second["option"].c_str()); argarr[argnum++] = (char*)(p[it->first].c_str()); } } } } /// Class for keeping track of connections to connectors. class ConnConn{ public: Socket::Connection * conn; ///< The socket of this connection unsigned int lastUse; ///< Seconds since last use of this connection. tthread::mutex inUse; ///< Mutex for this connection. /// Constructor that sets the socket and lastUse to 0. ConnConn(){ conn = 0; lastUse = 0; } /// Constructor that sets lastUse to 0, but socket to s. ConnConn(Socket::Connection * s){ conn = s; lastUse = 0; } /// Destructor that deletes the socket if non-null. ~ConnConn(){ if (conn){ conn->close(); delete conn; } conn = 0; } }; std::map connectorConnections; ///< Connections to connectors tthread::mutex connMutex; ///< Mutex for adding/removing connector connections. bool timeoutThreadStarted = false; tthread::mutex timeoutStartMutex; ///< Mutex for starting timeout thread. tthread::mutex timeoutMutex; ///< Mutex for timeout thread. tthread::thread * timeouter = 0; ///< Thread that times out connections to connectors. JSON::Value capabilities; ///< Holds a list of all HTTP connectors and their properties JSON::Value ServConf; /// < holds configuration, loads from file in main void updateConfig(){ static unsigned long long int confUpdateTime=0; static tthread::mutex updateLock; if( Util::bootSecs() -confUpdateTime > 10 ){ tthread::lock_guard guard(updateLock); if( Util::bootSecs() -confUpdateTime > 10 ){ ServConf = JSON::fromFile(Util::getTmpFolder() + "streamlist"); confUpdateTime=Util::bootSecs(); } } } ///\brief Function run as a thread to timeout requests on the proxy. ///\param n A NULL-pointer void proxyTimeoutThread(void * n){ n = 0; //prevent unused variable warning tthread::lock_guard guard(timeoutMutex); timeoutThreadStarted = true; while (true){ { tthread::lock_guard guard(connMutex); if (connectorConnections.empty()){ return; } std::map::iterator it; for (it = connectorConnections.begin(); it != connectorConnections.end(); it++){ ConnConn* ccPointer = it->second; if ( !ccPointer->conn->connected() || ccPointer->lastUse++ > 15){ if (ccPointer->inUse.try_lock()){ connectorConnections.erase(it); ccPointer->inUse.unlock(); delete ccPointer; it = connectorConnections.begin(); //get a valid iterator if (it == connectorConnections.end()){ return; } } } } } usleep(1000000); //sleep 1 second and re-check } } ///\brief Handles requests without associated handler. /// ///Displays a friendly error message. ///\param H The request to be handled. ///\param conn The connection to the client that issued the request. ///\return A timestamp indicating when the request was parsed. long long int proxyHandleUnsupported(HTTP::Parser & H, Socket::Connection & conn){ H.Clean(); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetBody( "Unsupported Media Type

Unsupported Media Type

The server isn't quite sure what you wanted to receive from it."); long long int ret = Util::getMS(); conn.SendNow(H.BuildResponse("415", "Unsupported Media Type")); return ret; } ///\brief Handles requests that have timed out. /// ///Displays a friendly error message. ///\param H The request that was being handled upon timeout. ///\param conn The connection to the client that issued the request. ///\param msg The message to print to the client. ///\return A timestamp indicating when the request was parsed. long long int proxyHandleTimeout(HTTP::Parser & H, Socket::Connection & conn, std::string msg){ H.Clean(); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetBody( ""+msg+"

"+msg+"

Though the server understood your request and attempted to handle it, somehow handling it took longer than it should. Your request has been cancelled - please try again later."); long long int ret = Util::getMS(); conn.SendNow(H.BuildResponse("504", msg)); return ret; } /// Sorts the JSON::Value objects that hold source information by preference. struct sourceCompare { bool operator() (const JSON::Value& lhs, const JSON::Value& rhs) const { //first compare simultaneous tracks if (lhs["simul_tracks"].asInt() > rhs["simul_tracks"].asInt()){ //more tracks = higher priority = true. return true; } if (lhs["simul_tracks"].asInt() < rhs["simul_tracks"].asInt()){ //less tracks = lower priority = false return false; } //same amount of tracks - compare "hardcoded" priorities if (lhs["priority"].asInt() > rhs["priority"].asInt()){ //higher priority = true. return true; } if (lhs["priority"].asInt() < rhs["priority"].asInt()){ //lower priority = false return false; } //same priority - compare total matches if (lhs["total_matches"].asInt() > rhs["total_matches"].asInt()){ //more matches = higher priority = true. return true; } if (lhs["total_matches"].asInt() < rhs["total_matches"].asInt()){ //less matches = lower priority = false return false; } //also same amount of matches? just compare the URL then. return lhs["url"].asStringRef() < rhs["url"].asStringRef(); } }; void addSource(const std::string & rel, std::set & sources, std::string & host, const std::string & port, JSON::Value & conncapa, unsigned int most_simul, unsigned int total_matches){ JSON::Value tmp; tmp["type"] = conncapa["type"]; tmp["relurl"] = rel; tmp["priority"] = conncapa["priority"]; tmp["simul_tracks"] = most_simul; tmp["total_matches"] = total_matches; tmp["url"] = conncapa["handler"].asStringRef() + "://" + host + ":" + port + rel; sources.insert(tmp); } void addSources(std::string & streamname, const std::string & rel, std::set & sources, std::string & host, const std::string & port, JSON::Value & conncapa, JSON::Value & strmMeta){ unsigned int most_simul = 0; unsigned int total_matches = 0; if (conncapa.isMember("codecs") && conncapa["codecs"].size() > 0){ for (JSON::ArrIter it = conncapa["codecs"].ArrBegin(); it != conncapa["codecs"].ArrEnd(); it++){ unsigned int simul = 0; if ((*it).size() > 0){ for (JSON::ArrIter itb = (*it).ArrBegin(); itb != (*it).ArrEnd(); itb++){ unsigned int matches = 0; if ((*itb).size() > 0){ for (JSON::ArrIter itc = (*itb).ArrBegin(); itc != (*itb).ArrEnd(); itc++){ for (JSON::ObjIter trit = strmMeta["tracks"].ObjBegin(); trit != strmMeta["tracks"].ObjEnd(); trit++){ if (trit->second["codec"].asStringRef() == (*itc).asStringRef()){ matches++; total_matches++; } } } } if (matches){ simul++; } } } if (simul > most_simul){ most_simul = simul; } } } if (conncapa.isMember("methods") && conncapa["methods"].size() > 0){ std::string relurl; size_t found = rel.find('$'); if (found != std::string::npos){ relurl = rel.substr(0, found) + streamname + rel.substr(found+1); }else{ relurl = "/"; } for (JSON::ArrIter it = conncapa["methods"].ArrBegin(); it != conncapa["methods"].ArrEnd(); it++){ if (!strmMeta.isMember("live") || !it->isMember("nolive")){ addSource(relurl, sources, host, port, *it, most_simul, total_matches); } } } } ///\brief Handles requests within the proxy. /// ///Currently supported urls: /// - /crossdomain.xml /// - /clientaccesspolicy.xml /// - *.ico (for favicon) /// - /info_[streamname].js (stream info) /// - /embed_[streamname].js (embed info) /// ///Unsupported urls default to proxyHandleUnsupported( ). ///\param H The request to be handled. ///\param conn The connection to the client that issued the request. ///\return A timestamp indicating when the request was parsed. long long int proxyHandleInternal(HTTP::Parser & H, Socket::Connection & conn){ updateConfig(); std::string url = H.getUrl(); if (url == "/crossdomain.xml"){ H.Clean(); H.SetHeader("Content-Type", "text/xml"); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetBody( ""); long long int ret = Util::getMS(); conn.SendNow(H.BuildResponse("200", "OK")); return ret; } //crossdomain.xml if (url == "/clientaccesspolicy.xml"){ H.Clean(); H.SetHeader("Content-Type", "text/xml"); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetBody( ""); long long int ret = Util::getMS(); conn.SendNow(H.BuildResponse("200", "OK")); return ret; } //clientaccesspolicy.xml // send logo icon if (url.length() > 4 && url.substr(url.length() - 4, 4) == ".ico"){ H.Clean(); #include "icon.h" H.SetHeader("Content-Type", "image/x-icon"); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Content-Length", icon_len); H.SetBody(""); long long int ret = Util::getMS(); conn.SendNow(H.BuildResponse("200", "OK")); conn.SendNow((const char*)icon_data, icon_len); return ret; } // send logo icon if (url.length() > 6 && url.substr(url.length() - 5, 5) == ".html"){ std::string streamname = url.substr(1, url.length() - 6); Util::Stream::sanitizeName(streamname); H.Clean(); H.SetHeader("Content-Type", "text/html"); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetBody("Stream "+streamname+""); long long int ret = Util::getMS(); conn.SendNow(H.BuildResponse("200", "OK")); return ret; } // send smil MBR index if (url.length() > 6 && url.substr(url.length() - 5, 5) == ".smil"){ std::string streamname = url.substr(1, url.length() - 6); Util::Stream::sanitizeName(streamname); std::string host = H.GetHeader("Host"); if (host.find(':')){ host.resize(host.find(':')); } std::string port, url_rel; for (JSON::ArrIter it = ServConf["config"]["protocols"].ArrBegin(); it != ServConf["config"]["protocols"].ArrEnd(); it++){ const std::string & cName = ( *it)["connector"].asStringRef(); if (cName != "RTMP"){continue;} //if we have the RTMP port, if (capabilities.isMember(cName) && capabilities[cName].isMember("optional") && capabilities[cName]["optional"].isMember("port")){ //get the default port if none is set if (( *it)["port"].asInt() == 0){ port = capabilities[cName]["optional"]["port"]["default"].asString(); } //extract url if (capabilities[cName].isMember("url_rel")){ url_rel = capabilities[cName]["url_rel"].asString(); if (url_rel.find('$')){ url_rel.resize(url_rel.find('$')); } } } } std::string trackSources;//this string contains all track sources for MBR smil for (JSON::ObjIter it = ServConf["streams"][streamname]["meta"]["tracks"].ObjBegin(); it != ServConf["streams"][streamname]["meta"]["tracks"].ObjEnd(); it++){//for all tracks if (it->second.isMember("type") && it->second["type"].asString() == "video"){ trackSources += "