Refactor and doxygen of all connectors finished. Internal documentation follows later.

This commit is contained in:
Erik Zandvliet 2013-03-28 12:05:45 +01:00
parent 570aa95315
commit 9b41a07c2f
7 changed files with 809 additions and 763 deletions

View file

@ -30,18 +30,18 @@ namespace Connector_HTTP {
class ConnConn{ class ConnConn{
public: public:
Socket::Connection * conn; ///< The socket of this connection Socket::Connection * conn; ///< The socket of this connection
unsigned int lastuse; ///< Seconds since last use of this connection. unsigned int lastUse; ///< Seconds since last use of this connection.
tthread::mutex in_use; ///< Mutex for this connection. tthread::mutex inUse; ///< Mutex for this connection.
/// Constructor that sets the socket and lastuse to 0. /// Constructor that sets the socket and lastUse to 0.
ConnConn(){ ConnConn(){
conn = 0; conn = 0;
lastuse = 0; lastUse = 0;
} }
; ;
/// Constructor that sets lastuse to 0, but socket to s. /// Constructor that sets lastUse to 0, but socket to s.
ConnConn(Socket::Connection * s){ ConnConn(Socket::Connection * s){
conn = s; conn = s;
lastuse = 0; lastUse = 0;
} }
; ;
/// Destructor that deletes the socket if non-null. /// Destructor that deletes the socket if non-null.
@ -55,44 +55,51 @@ namespace Connector_HTTP {
; ;
}; };
std::map<std::string, ConnConn *> connconn; ///< Connections to connectors std::map<std::string, ConnConn *> connectorConnections; ///< Connections to connectors
std::set<tthread::thread *> active_threads; ///< Holds currently active threads std::set<tthread::thread *> activeThreads; ///< Holds currently active threads
std::set<tthread::thread *> done_threads; ///< Holds threads that are done and ready to be joined. std::set<tthread::thread *> doneThreads; ///< Holds threads that are done and ready to be joined.
tthread::mutex thread_mutex; ///< Mutex for adding/removing threads. tthread::mutex threadMutex; ///< Mutex for adding/removing threads.
tthread::mutex conn_mutex; ///< Mutex for adding/removing connector connections. tthread::mutex connMutex; ///< Mutex for adding/removing connector connections.
tthread::mutex timeout_mutex; ///< Mutex for timeout thread. tthread::mutex timeoutMutex; ///< Mutex for timeout thread.
tthread::thread * timeouter = 0; ///< Thread that times out connections to connectors. tthread::thread * timeouter = 0; ///< Thread that times out connections to connectors.
///\brief Function run as a thread to timeout requests on the proxy.
///\param n A NULL-pointer
void proxyTimeoutThread(void * n){ void proxyTimeoutThread(void * n){
n = 0; //prevent unused variable warning n = 0; //prevent unused variable warning
tthread::lock_guard<tthread::mutex> guard(timeout_mutex); tthread::lock_guard<tthread::mutex> guard(timeoutMutex);
while (true){ while (true){
{ {
tthread::lock_guard<tthread::mutex> guard(conn_mutex); tthread::lock_guard<tthread::mutex> guard(connMutex);
if (connconn.empty()){ if (connectorConnections.empty()){
return; return;
} }
std::map<std::string, ConnConn*>::iterator it; std::map<std::string, ConnConn*>::iterator it;
for (it = connconn.begin(); it != connconn.end(); it++){ for (it = connectorConnections.begin(); it != connectorConnections.end(); it++){
if ( !it->second->conn->connected() || it->second->lastuse++ > 15){ if ( !it->second->conn->connected() || it->second->lastUse++ > 15){
if (it->second->in_use.try_lock()){ if (it->second->inUse.try_lock()){
it->second->in_use.unlock(); it->second->inUse.unlock();
delete it->second; delete it->second;
connconn.erase(it); connectorConnections.erase(it);
it = connconn.begin(); //get a valid iterator it = connectorConnections.begin(); //get a valid iterator
if (it == connconn.end()){ if (it == connectorConnections.end()){
return; return;
} }
} }
} }
} }
conn_mutex.unlock(); connMutex.unlock();
} }
usleep(1000000); //sleep 1 second and re-check usleep(1000000); //sleep 1 second and re-check
} }
} }
/// Handles requests without associated handler, displaying a nice friendly error message. ///\brief Handles requests without associated handler.
///
///Displays a friendly error message.
///\param H The request to be handled.
///\param conn The connection to the client that issued the request.
///\return A timestamp indicating when the request was parsed.
long long int proxyHandleUnsupported(HTTP::Parser & H, Socket::Connection * conn){ long long int proxyHandleUnsupported(HTTP::Parser & H, Socket::Connection * conn){
H.Clean(); H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
@ -103,6 +110,12 @@ namespace Connector_HTTP {
return ret; return ret;
} }
///\brief Handles requests that have timed out.
///
///Displays a friendly error message.
///\param H The request that was being handled upon timeout.
///\param conn The connection to the client that issued the request.
///\return A timestamp indicating when the request was parsed.
long long int proxyHandleTimeout(HTTP::Parser & H, Socket::Connection * conn){ long long int proxyHandleTimeout(HTTP::Parser & H, Socket::Connection * conn){
H.Clean(); H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
@ -113,7 +126,19 @@ namespace Connector_HTTP {
return ret; return ret;
} }
/// Handles internal requests. ///\brief Handles requests within the proxy.
///
///Currently supported urls:
/// - /crossdomain.xml
/// - /clientaccesspolicy.xml
/// - *.ico (for favicon)
/// - /info_[streamname].js (stream info)
/// - /embed_[streamname].js (embed info)
///
///Unsupported urls default to proxyHandleUnsupported( ).
///\param H The request to be handled.
///\param conn The connection to the client that issued the request.
///\return A timestamp indicating when the request was parsed.
long long int proxyHandleInternal(HTTP::Parser & H, Socket::Connection * conn){ long long int proxyHandleInternal(HTTP::Parser & H, Socket::Connection * conn){
std::string url = H.getUrl(); std::string url = H.getUrl();
@ -263,7 +288,11 @@ namespace Connector_HTTP {
return proxyHandleUnsupported(H, conn); //anything else doesn't get handled return proxyHandleUnsupported(H, conn); //anything else doesn't get handled
} }
/// Handles requests without associated handler, displaying a nice friendly error message. ///\brief Handles requests by dispatching them to the corresponding connector.
///\param H The request to be handled.
///\param conn The connection to the client that issued the request.
///\param connector The type of connector to be invoked.
///\return A timestamp indicating when the request was parsed.
long long int proxyHandleThroughConnector(HTTP::Parser & H, Socket::Connection * conn, std::string & connector){ long long int proxyHandleThroughConnector(HTTP::Parser & H, Socket::Connection * conn, std::string & connector){
//create a unique ID based on a hash of the user agent and host, followed by the stream name and connector //create a unique ID based on a hash of the user agent and host, followed by the stream name and connector
std::string uid = Secure::md5(H.GetHeader("User-Agent") + conn->getHost()) + "_" + H.GetVar("stream") + "_" + connector; std::string uid = Secure::md5(H.GetHeader("User-Agent") + conn->getHost()) + "_" + H.GetVar("stream") + "_" + connector;
@ -275,13 +304,13 @@ namespace Connector_HTTP {
H.Clean(); H.Clean();
//check if a connection exists, and if not create one //check if a connection exists, and if not create one
conn_mutex.lock(); connMutex.lock();
if ( !connconn.count(uid) || !connconn[uid]->conn->connected()){ if ( !connectorConnections.count(uid) || !connectorConnections[uid]->conn->connected()){
if (connconn.count(uid)){ if (connectorConnections.count(uid)){
connconn.erase(uid); connectorConnections.erase(uid);
} }
connconn[uid] = new ConnConn(new Socket::Connection("/tmp/mist/http_" + connector)); connectorConnections[uid] = new ConnConn(new Socket::Connection("/tmp/mist/http_" + connector));
connconn[uid]->conn->setBlocking(false); //do not block on spool() with no data connectorConnections[uid]->conn->setBlocking(false); //do not block on spool() with no data
#if DEBUG >= 4 #if DEBUG >= 4
std::cout << "Created new connection " << uid << std::endl; std::cout << "Created new connection " << uid << std::endl;
#endif #endif
@ -291,56 +320,56 @@ namespace Connector_HTTP {
#endif #endif
} }
//start a new timeout thread, if neccesary //start a new timeout thread, if neccesary
if (timeout_mutex.try_lock()){ if (timeoutMutex.try_lock()){
if (timeouter){ if (timeouter){
timeouter->join(); timeouter->join();
delete timeouter; delete timeouter;
} }
timeouter = new tthread::thread(Connector_HTTP::proxyTimeoutThread, 0); timeouter = new tthread::thread(Connector_HTTP::proxyTimeoutThread, 0);
timeout_mutex.unlock(); timeoutMutex.unlock();
} }
conn_mutex.unlock(); connMutex.unlock();
//lock the mutex for this connection, and handle the request //lock the mutex for this connection, and handle the request
tthread::lock_guard<tthread::mutex> guard(connconn[uid]->in_use); tthread::lock_guard<tthread::mutex> guard(connectorConnections[uid]->inUse);
//if the server connection is dead, handle as timeout. //if the server connection is dead, handle as timeout.
if ( !connconn.count(uid) || !connconn[uid]->conn->connected()){ if ( !connectorConnections.count(uid) || !connectorConnections[uid]->conn->connected()){
connconn[uid]->conn->close(); connectorConnections[uid]->conn->close();
return proxyHandleTimeout(H, conn); return proxyHandleTimeout(H, conn);
} }
//forward the original request //forward the original request
connconn[uid]->conn->SendNow(request); connectorConnections[uid]->conn->SendNow(request);
connconn[uid]->lastuse = 0; connectorConnections[uid]->lastUse = 0;
unsigned int timeout = 0; unsigned int timeout = 0;
unsigned int retries = 0; unsigned int retries = 0;
//wait for a response //wait for a response
while (connconn.count(uid) && connconn[uid]->conn->connected() && conn->connected()){ while (connectorConnections.count(uid) && connectorConnections[uid]->conn->connected() && conn->connected()){
conn->spool(); conn->spool();
if (connconn[uid]->conn->Received().size() || connconn[uid]->conn->spool()){ if (connectorConnections[uid]->conn->Received().size() || connectorConnections[uid]->conn->spool()){
//make sure we end in a \n //make sure we end in a \n
if ( *(connconn[uid]->conn->Received().get().rbegin()) != '\n'){ if ( *(connectorConnections[uid]->conn->Received().get().rbegin()) != '\n'){
std::string tmp = connconn[uid]->conn->Received().get(); std::string tmp = connectorConnections[uid]->conn->Received().get();
connconn[uid]->conn->Received().get().clear(); connectorConnections[uid]->conn->Received().get().clear();
if (connconn[uid]->conn->Received().size()){ if (connectorConnections[uid]->conn->Received().size()){
connconn[uid]->conn->Received().get().insert(0, tmp); connectorConnections[uid]->conn->Received().get().insert(0, tmp);
}else{ }else{
connconn[uid]->conn->Received().append(tmp); connectorConnections[uid]->conn->Received().append(tmp);
} }
} }
//check if the whole response was received //check if the whole response was received
if (H.Read(connconn[uid]->conn->Received().get())){ if (H.Read(connectorConnections[uid]->conn->Received().get())){
//208 means the fragment is too new, retry in 3s //208 means the fragment is too new, retry in 3s
if (H.url == "208"){ if (H.url == "208"){
retries++; retries++;
if (retries >= 5){ if (retries >= 5){
std::cout << "[5 retry-laters, cancelled]" << std::endl; std::cout << "[5 retry-laters, cancelled]" << std::endl;
connconn[uid]->conn->close(); connectorConnections[uid]->conn->close();
return proxyHandleTimeout(H, conn); return proxyHandleTimeout(H, conn);
} }
connconn[uid]->lastuse = 0; connectorConnections[uid]->lastUse = 0;
timeout = 0; timeout = 0;
Util::sleep(3000); Util::sleep(3000);
connconn[uid]->conn->SendNow(request); connectorConnections[uid]->conn->SendNow(request);
H.Clean(); H.Clean();
continue; continue;
} }
@ -350,16 +379,16 @@ namespace Connector_HTTP {
//keep trying unless the timeout triggers //keep trying unless the timeout triggers
if (timeout++ > 4000){ if (timeout++ > 4000){
std::cout << "[20s timeout triggered]" << std::endl; std::cout << "[20s timeout triggered]" << std::endl;
connconn[uid]->conn->close(); connectorConnections[uid]->conn->close();
return proxyHandleTimeout(H, conn); return proxyHandleTimeout(H, conn);
}else{ }else{
Util::sleep(5); Util::sleep(5);
} }
} }
} }
if ( !connconn.count(uid) || !connconn[uid]->conn->connected() || !conn->connected()){ if ( !connectorConnections.count(uid) || !connectorConnections[uid]->conn->connected() || !conn->connected()){
//failure, disconnect and sent error to user //failure, disconnect and sent error to user
connconn[uid]->conn->close(); connectorConnections[uid]->conn->close();
return proxyHandleTimeout(H, conn); return proxyHandleTimeout(H, conn);
}else{ }else{
long long int ret = Util::getMS(); long long int ret = Util::getMS();
@ -375,9 +404,9 @@ namespace Connector_HTTP {
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
conn->SendNow(H.BuildResponse("200", "OK")); conn->SendNow(H.BuildResponse("200", "OK"));
//switch out the connection for an empty one - it makes no sense to keep these globally //switch out the connection for an empty one - it makes no sense to keep these globally
Socket::Connection * myConn = connconn[uid]->conn; Socket::Connection * myConn = connectorConnections[uid]->conn;
connconn[uid]->conn = new Socket::Connection(); connectorConnections[uid]->conn = new Socket::Connection();
connconn[uid]->in_use.unlock(); connectorConnections[uid]->inUse.unlock();
//continue sending data from this socket and keep it permanently in use //continue sending data from this socket and keep it permanently in use
while (myConn->connected() && conn->connected()){ while (myConn->connected() && conn->connected()){
if (myConn->Received().size() || myConn->spool()){ if (myConn->Received().size() || myConn->spool()){
@ -396,12 +425,16 @@ namespace Connector_HTTP {
} }
} }
/// Returns the name of the HTTP connector the given request should be served by. ///\brief Determines the type of connector to be used for handling a request.
/// Can currently return: ///\param H The request to be handled..
/// - none (request not supported) ///\return A string indicating the type of connector.
/// - internal (request fed from information internal to this connector) ///Possible values are:
/// - dynamic (request fed from http_dynamic connector) /// - "none" The request is not supported.
/// - progressive (request fed from http_progressive connector) /// - "internal" The request should be handled by the proxy itself.
/// - "dynamic" The request should be dispatched to the HTTP Dynamic Connector
/// - "progressive" The request should be dispatched to the HTTP Progressive Connector
/// - "smooth" The request should be dispatched to the HTTP Smooth Connector
/// - "live" The request should be dispatched to the HTTP Live Connector
std::string proxyGetHandleType(HTTP::Parser & H){ std::string proxyGetHandleType(HTTP::Parser & H){
std::string url = H.getUrl(); std::string url = H.getUrl();
if (url.find("/dynamic/") != std::string::npos){ if (url.find("/dynamic/") != std::string::npos){
@ -449,7 +482,8 @@ namespace Connector_HTTP {
return "none"; return "none";
} }
/// Thread for handling a single HTTP connection ///\brief Function run as a thread to handle a single HTTP connection.
///\param pointer A Socket::Connection* indicating the connection to th client.
void proxyHandleHTTPConnection(void * pointer){ void proxyHandleHTTPConnection(void * pointer){
Socket::Connection * conn = (Socket::Connection *)pointer; Socket::Connection * conn = (Socket::Connection *)pointer;
conn->setBlocking(false); //do not block on conn.spool() when no data is available conn->setBlocking(false); //do not block on conn.spool() when no data is available
@ -504,17 +538,17 @@ namespace Connector_HTTP {
//close and remove the connection //close and remove the connection
conn->close(); conn->close();
delete conn; delete conn;
//remove this thread from active_threads and add it to done_threads. //remove this thread from activeThreads and add it to doneThreads.
thread_mutex.lock(); threadMutex.lock();
for (std::set<tthread::thread *>::iterator it = active_threads.begin(); it != active_threads.end(); it++){ for (std::set<tthread::thread *>::iterator it = activeThreads.begin(); it != activeThreads.end(); it++){
if (( *it)->get_id() == tthread::this_thread::get_id()){ if (( *it)->get_id() == tthread::this_thread::get_id()){
tthread::thread * T = ( *it); tthread::thread * T = ( *it);
active_threads.erase(T); activeThreads.erase(T);
done_threads.insert(T); doneThreads.insert(T);
break; break;
} }
} }
thread_mutex.unlock(); threadMutex.unlock();
} }
} //Connector_HTTP namespace } //Connector_HTTP namespace
@ -533,17 +567,17 @@ int main(int argc, char ** argv){
Socket::Connection S = server_socket.accept(); Socket::Connection S = server_socket.accept();
if (S.connected()){ //check if the new connection is valid if (S.connected()){ //check if the new connection is valid
//lock the thread mutex and spawn a new thread for this connection //lock the thread mutex and spawn a new thread for this connection
Connector_HTTP::thread_mutex.lock(); Connector_HTTP::threadMutex.lock();
tthread::thread * T = new tthread::thread(Connector_HTTP::proxyHandleHTTPConnection, (void *)(new Socket::Connection(S))); tthread::thread * T = new tthread::thread(Connector_HTTP::proxyHandleHTTPConnection, (void *)(new Socket::Connection(S)));
Connector_HTTP::active_threads.insert(T); Connector_HTTP::activeThreads.insert(T);
//clean up any threads that may have finished //clean up any threads that may have finished
while ( !Connector_HTTP::done_threads.empty()){ while ( !Connector_HTTP::doneThreads.empty()){
T = *Connector_HTTP::done_threads.begin(); T = *Connector_HTTP::doneThreads.begin();
T->join(); T->join();
Connector_HTTP::done_threads.erase(T); Connector_HTTP::doneThreads.erase(T);
delete T; delete T;
} }
Connector_HTTP::thread_mutex.unlock(); Connector_HTTP::threadMutex.unlock();
}else{ }else{
Util::sleep(10); //sleep 10ms Util::sleep(10); //sleep 10ms
} }
@ -553,16 +587,16 @@ int main(int argc, char ** argv){
//wait for existing connections to drop //wait for existing connections to drop
bool repeat = true; bool repeat = true;
while (repeat){ while (repeat){
Connector_HTTP::thread_mutex.lock(); Connector_HTTP::threadMutex.lock();
repeat = !Connector_HTTP::active_threads.empty(); repeat = !Connector_HTTP::activeThreads.empty();
//clean up any threads that may have finished //clean up any threads that may have finished
while ( !Connector_HTTP::done_threads.empty()){ while ( !Connector_HTTP::doneThreads.empty()){
tthread::thread * T = *Connector_HTTP::done_threads.begin(); tthread::thread * T = *Connector_HTTP::doneThreads.begin();
T->join(); T->join();
Connector_HTTP::done_threads.erase(T); Connector_HTTP::doneThreads.erase(T);
delete T; delete T;
} }
Connector_HTTP::thread_mutex.unlock(); Connector_HTTP::threadMutex.unlock();
if (repeat){ if (repeat){
Util::sleep(100); //sleep 100ms Util::sleep(100); //sleep 100ms
} }

View file

@ -26,11 +26,11 @@
/// Holds everything unique to HTTP Connectors. /// Holds everything unique to HTTP Connectors.
namespace Connector_HTTP { namespace Connector_HTTP {
///\brief Builds a bootstrap for use in HTTP Dynamic streaming. ///\brief Builds a bootstrap for use in HTTP Dynamic streaming.
///\param MovieId The name of the movie. ///\param streamName The name of the stream.
///\param metadata The current metadata, used to generate the index. ///\param metadata The current metadata, used to generate the index.
///\param fragnum The index of the current fragment ///\param fragnum The index of the current fragment
///\return The generated bootstrap. ///\return The generated bootstrap.
std::string dynamicBootstrap(std::string & MovieId, JSON::Value & metadata, int fragnum = 0){ std::string dynamicBootstrap(std::string & streamName, JSON::Value & metadata, int fragnum = 0){
std::string empty; std::string empty;
MP4::ASRT asrt; MP4::ASRT asrt;
@ -80,7 +80,7 @@ namespace Connector_HTTP {
abst.setLive(false); abst.setLive(false);
abst.setCurrentMediaTime(metadata["lastms"].asInt()); abst.setCurrentMediaTime(metadata["lastms"].asInt());
abst.setSmpteTimeCodeOffset(0); abst.setSmpteTimeCodeOffset(0);
abst.setMovieIdentifier(MovieId); abst.setMovieIdentifier(streamName);
abst.setSegmentRunTable(asrt, 0); abst.setSegmentRunTable(asrt, 0);
abst.setFragmentRunTable(afrt, 0); abst.setFragmentRunTable(afrt, 0);
@ -91,24 +91,24 @@ namespace Connector_HTTP {
} }
///\brief Builds an index file for HTTP Dynamic streaming. ///\brief Builds an index file for HTTP Dynamic streaming.
///\param MovieId The name of the movie. ///\param streamName The name of the stream.
///\param metadata The current metadata, used to generate the index. ///\param metadata The current metadata, used to generate the index.
///\return The index file for HTTP Dynamic Streaming. ///\return The index file for HTTP Dynamic Streaming.
std::string dynamicIndex(std::string & MovieId, JSON::Value & metadata){ std::string dynamicIndex(std::string & streamName, JSON::Value & metadata){
std::string Result; std::string Result;
if (metadata.isMember("vod")){ if (metadata.isMember("vod")){
Result = Result =
"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
"<manifest xmlns=\"http://ns.adobe.com/f4m/1.0\">\n" "<manifest xmlns=\"http://ns.adobe.com/f4m/1.0\">\n"
"<id>" + MovieId + "</id>\n" "<id>" + streamName + "</id>\n"
"<width>" + metadata["video"]["width"].asString() + "</width>\n" "<width>" + metadata["video"]["width"].asString() + "</width>\n"
"<height>" + metadata["video"]["height"].asString() + "</height>\n" "<height>" + metadata["video"]["height"].asString() + "</height>\n"
"<duration>" + metadata["length"].asString() + ".000</duration>\n" "<duration>" + metadata["length"].asString() + ".000</duration>\n"
"<mimeType>video/mp4</mimeType>\n" "<mimeType>video/mp4</mimeType>\n"
"<streamType>recorded</streamType>\n" "<streamType>recorded</streamType>\n"
"<deliveryType>streaming</deliveryType>\n" "<deliveryType>streaming</deliveryType>\n"
"<bootstrapInfo profile=\"named\" id=\"bootstrap1\">" + Base64::encode(dynamicBootstrap(MovieId, metadata)) + "</bootstrapInfo>\n" "<bootstrapInfo profile=\"named\" id=\"bootstrap1\">" + Base64::encode(dynamicBootstrap(streamName, metadata)) + "</bootstrapInfo>\n"
"<media streamId=\"1\" bootstrapInfoId=\"bootstrap1\" url=\"" + MovieId + "/\">\n" "<media streamId=\"1\" bootstrapInfoId=\"bootstrap1\" url=\"" + streamName + "/\">\n"
"<metadata>AgAKb25NZXRhRGF0YQMAAAk=</metadata>\n" "<metadata>AgAKb25NZXRhRGF0YQMAAAk=</metadata>\n"
"</media>\n" "</media>\n"
"</manifest>\n"; "</manifest>\n";
@ -116,15 +116,15 @@ namespace Connector_HTTP {
Result = Result =
"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
"<manifest xmlns=\"http://ns.adobe.com/f4m/1.0\">\n" "<manifest xmlns=\"http://ns.adobe.com/f4m/1.0\">\n"
"<id>" + MovieId + "</id>\n" "<id>" + streamName + "</id>\n"
"<dvrInfo windowDuration=\"" + metadata["buffer_window"].asString().substr(0, metadata["buffer_window"].asString().size() - 3) + "\"></dvrInfo>" "<dvrInfo windowDuration=\"" + metadata["buffer_window"].asString().substr(0, metadata["buffer_window"].asString().size() - 3) + "\"></dvrInfo>"
"<mimeType>video/mp4</mimeType>\n" "<mimeType>video/mp4</mimeType>\n"
"<streamType>live</streamType>\n" "<streamType>live</streamType>\n"
"<deliveryType>streaming</deliveryType>\n" "<deliveryType>streaming</deliveryType>\n"
"<media url=\"" + MovieId + "/\">\n" "<media url=\"" + streamName + "/\">\n"
"<metadata>AgAKb25NZXRhRGF0YQMAAAk=</metadata>\n" "<metadata>AgAKb25NZXRhRGF0YQMAAAk=</metadata>\n"
"</media>\n" "</media>\n"
"<bootstrapInfo profile=\"named\" url=\"" + MovieId + ".abst\" />\n" "<bootstrapInfo profile=\"named\" url=\"" + streamName + ".abst\" />\n"
"</manifest>\n"; "</manifest>\n";
} }
#if DEBUG >= 8 #if DEBUG >= 8

View file

@ -26,10 +26,9 @@
/// Holds everything unique to HTTP Connectors. /// Holds everything unique to HTTP Connectors.
namespace Connector_HTTP { namespace Connector_HTTP {
///\brief Builds an index file for HTTP Live streaming. ///\brief Builds an index file for HTTP Live streaming.
///\param MovieId The name of the movie.
///\param metadata The current metadata, used to generate the index. ///\param metadata The current metadata, used to generate the index.
///\return The index file for HTTP Live Streaming. ///\return The index file for HTTP Live Streaming.
std::string liveIndex(std::string & MovieId, JSON::Value & metadata){ std::string liveIndex(JSON::Value & metadata){
std::stringstream Result; std::stringstream Result;
if ( !metadata.isMember("live")){ if ( !metadata.isMember("live")){
int longestFragment = 0; int longestFragment = 0;
@ -62,7 +61,7 @@ namespace Connector_HTTP {
return Result.str(); return Result.str();
} //liveIndex } //liveIndex
///\brief Main function for the HTTP HLS Connector ///\brief Main function for the HTTP Live Connector
///\param conn A socket describing the connection the client. ///\param conn A socket describing the connection the client.
///\return The exit code of the connector. ///\return The exit code of the connector.
int liveConnector(Socket::Connection conn){ int liveConnector(Socket::Connection conn){
@ -175,7 +174,7 @@ namespace Connector_HTTP {
HTTP_S.Clean(); HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", manifestType); HTTP_S.SetHeader("Content-Type", manifestType);
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
std::string manifest = liveIndex(streamname, Strm.metadata); std::string manifest = liveIndex(Strm.metadata);
HTTP_S.SetBody(manifest); HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
} }

View file

@ -28,10 +28,9 @@
///\brief Holds everything unique to HTTP Connectors. ///\brief Holds everything unique to HTTP Connectors.
namespace Connector_HTTP { namespace Connector_HTTP {
///\brief Builds an index file for HTTP Smooth streaming. ///\brief Builds an index file for HTTP Smooth streaming.
///\param MovieId The name of the movie.
///\param metadata The current metadata, used to generate the index. ///\param metadata The current metadata, used to generate the index.
///\return The index file for HTTP Smooth Streaming. ///\return The index file for HTTP Smooth Streaming.
std::string smoothIndex(std::string & MovieId, JSON::Value & metadata){ std::string smoothIndex(JSON::Value & metadata){
std::stringstream Result; std::stringstream Result;
Result << "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"; Result << "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n";
Result << "<SmoothStreamingMedia " Result << "<SmoothStreamingMedia "
@ -245,7 +244,7 @@ namespace Connector_HTTP {
HTTP_S.Clean(); HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml"); HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
std::string manifest = smoothIndex(streamname, Strm.metadata); std::string manifest = smoothIndex(Strm.metadata);
HTTP_S.SetBody(manifest); HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
} }

View file

@ -8,9 +8,10 @@
#include <mist/stream.h> #include <mist/stream.h>
#include <mist/timing.h> #include <mist/timing.h>
/// Contains the main code for the RAW connector. ///\brief Contains the main code for the RAW connector.
/// Expects a single commandline argument telling it which stream to connect to, ///
/// then outputs the raw stream to stdout. ///Expects a single commandline argument telling it which stream to connect to,
///then outputs the raw stream to stdout.
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION); Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addOption("stream_name", JSON::fromString("{\"arg_num\":1, \"help\":\"Name of the stream to write to stdout.\"}")); conf.addOption("stream_name", JSON::fromString("{\"arg_num\":1, \"help\":\"Name of the stream to write to stdout.\"}"));

File diff suppressed because it is too large Load diff

View file

@ -2,145 +2,152 @@
/// Contains the main code for the TS Connector /// Contains the main code for the TS Connector
#include <queue> #include <queue>
#include <string>
#include <iostream>
#include <cmath> #include <cmath>
#include <ctime> #include <ctime>
#include <cstdio> #include <cstdio>
#include <string>
#include <cstdlib> #include <cstdlib>
#include <cstring> #include <cstring>
#include <unistd.h> #include <unistd.h>
#include <getopt.h> #include <getopt.h>
#include <iostream>
#include <sys/time.h> #include <sys/time.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <sys/types.h> #include <sys/types.h>
#include <mist/socket.h> #include <mist/socket.h>
#include <mist/config.h> #include <mist/config.h>
#include <mist/stream.h> #include <mist/stream.h>
#include <mist/ts_packet.h> //TS support #include <mist/ts_packet.h> //TS support
#include <mist/dtsc.h> //DTSC support #include <mist/dtsc.h> //DTSC support
#include <mist/mp4.h> //For initdata conversion #include <mist/mp4.h> //For initdata conversion
/// The main function of the connector
/// \param conn A connection with the client
/// \param streamname The name of the stream
int TS_Handler(Socket::Connection conn, std::string streamname){
std::string ToPack;
TS::Packet PackData;
std::string DTMIData;
int PacketNumber = 0;
long long unsigned int TimeStamp = 0;
int ThisNaluSize;
char VideoCounter = 0;
char AudioCounter = 0;
bool WritePesHeader;
bool IsKeyFrame;
bool FirstKeyFrame = true;
bool FirstIDRInKeyFrame;
MP4::AVCC avccbox;
bool haveAvcc = false;
DTSC::Stream Strm; ///\brief Holds everything unique to the TS Connector
bool inited = false; namespace Connector_TS {
Socket::Connection ss; ///\brief Main function for the TS Connector
///\param conn A socket describing the connection the client.
///\param streamName The stream to connect to.
///\return The exit code of the connector.
int tsConnector(Socket::Connection conn, std::string streamName){
std::string ToPack;
TS::Packet PackData;
std::string DTMIData;
int PacketNumber = 0;
long long unsigned int TimeStamp = 0;
int ThisNaluSize;
char VideoCounter = 0;
char AudioCounter = 0;
bool WritePesHeader;
bool IsKeyFrame;
bool FirstKeyFrame = true;
bool FirstIDRInKeyFrame;
MP4::AVCC avccbox;
bool haveAvcc = false;
while (conn.connected()){ DTSC::Stream Strm;
if ( !inited){ bool inited = false;
ss = Util::Stream::getStream(streamname); Socket::Connection ss;
if ( !ss.connected()){
#if DEBUG >= 1 while (conn.connected()){
fprintf(stderr, "Could not connect to server!\n"); if ( !inited){
#endif ss = Util::Stream::getStream(streamName);
conn.close(); if ( !ss.connected()){
break; #if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
conn.close();
break;
}
ss.SendNow("p\n");
inited = true;
} }
ss.SendNow("p\n"); if (ss.spool()){
inited = true; while (Strm.parsePacket(ss.Received())){
} if ( !haveAvcc){
if (ss.spool()){ avccbox.setPayload(Strm.metadata["video"]["init"].asString());
while (Strm.parsePacket(ss.Received())){ haveAvcc = true;
if ( !haveAvcc){
avccbox.setPayload(Strm.metadata["video"]["init"].asString());
haveAvcc = true;
}
std::stringstream TSBuf;
Socket::Buffer ToPack;
//write PAT and PMT TS packets
if (PacketNumber == 0){
PackData.DefaultPAT();
TSBuf.write(PackData.ToString(), 188);
PackData.DefaultPMT();
TSBuf.write(PackData.ToString(), 188);
PacketNumber += 2;
}
int PIDno = 0;
char * ContCounter = 0;
if (Strm.lastType() == DTSC::VIDEO){
IsKeyFrame = Strm.getPacket(0).isMember("keyframe");
if (IsKeyFrame){
TimeStamp = (Strm.getPacket(0)["time"].asInt() * 27000);
} }
ToPack.append(avccbox.asAnnexB()); std::stringstream TSBuf;
while (Strm.lastData().size()){ Socket::Buffer ToPack;
ThisNaluSize = (Strm.lastData()[0] << 24) + (Strm.lastData()[1] << 16) + (Strm.lastData()[2] << 8) + Strm.lastData()[3]; //write PAT and PMT TS packets
Strm.lastData().replace(0, 4, TS::NalHeader, 4); if (PacketNumber == 0){
if (ThisNaluSize + 4 == Strm.lastData().size()){ PackData.DefaultPAT();
ToPack.append(Strm.lastData()); TSBuf.write(PackData.ToString(), 188);
break; PackData.DefaultPMT();
}else{ TSBuf.write(PackData.ToString(), 188);
ToPack.append(Strm.lastData().c_str(), ThisNaluSize + 4); PacketNumber += 2;
Strm.lastData().erase(0, ThisNaluSize + 4); }
int PIDno = 0;
char * ContCounter = 0;
if (Strm.lastType() == DTSC::VIDEO){
IsKeyFrame = Strm.getPacket(0).isMember("keyframe");
if (IsKeyFrame){
TimeStamp = (Strm.getPacket(0)["time"].asInt() * 27000);
} }
ToPack.append(avccbox.asAnnexB());
while (Strm.lastData().size()){
ThisNaluSize = (Strm.lastData()[0] << 24) + (Strm.lastData()[1] << 16) + (Strm.lastData()[2] << 8) + Strm.lastData()[3];
Strm.lastData().replace(0, 4, TS::NalHeader, 4);
if (ThisNaluSize + 4 == Strm.lastData().size()){
ToPack.append(Strm.lastData());
break;
}else{
ToPack.append(Strm.lastData().c_str(), ThisNaluSize + 4);
Strm.lastData().erase(0, ThisNaluSize + 4);
}
}
ToPack.prepend(TS::Packet::getPESVideoLeadIn(0ul, Strm.getPacket(0)["time"].asInt() * 90));
PIDno = 0x100;
ContCounter = &VideoCounter;
}else if (Strm.lastType() == DTSC::AUDIO){
ToPack.append(TS::GetAudioHeader(Strm.lastData().size(), Strm.metadata["audio"]["init"].asString()));
ToPack.append(Strm.lastData());
ToPack.prepend(TS::Packet::getPESAudioLeadIn(ToPack.bytes(1073741824ul), Strm.getPacket(0)["time"].asInt() * 90));
PIDno = 0x101;
ContCounter = &AudioCounter;
} }
ToPack.prepend(TS::Packet::getPESVideoLeadIn(0ul, Strm.getPacket(0)["time"].asInt() * 90));
PIDno = 0x100;
ContCounter = &VideoCounter;
}else if (Strm.lastType() == DTSC::AUDIO){
ToPack.append(TS::GetAudioHeader(Strm.lastData().size(), Strm.metadata["audio"]["init"].asString()));
ToPack.append(Strm.lastData());
ToPack.prepend(TS::Packet::getPESAudioLeadIn(ToPack.bytes(1073741824ul), Strm.getPacket(0)["time"].asInt() * 90));
PIDno = 0x101;
ContCounter = &AudioCounter;
}
//initial packet //initial packet
PackData.Clear();
PackData.PID(PIDno);
PackData.ContinuityCounter(( *ContCounter)++);
PackData.UnitStart(1);
if (IsKeyFrame){
PackData.RandomAccess(1);
PackData.PCR(TimeStamp);
}
unsigned int toSend = PackData.AddStuffing(ToPack.bytes(184));
std::string gonnaSend = ToPack.remove(toSend);
PackData.FillFree(gonnaSend);
TSBuf.write(PackData.ToString(), 188);
PacketNumber++;
//rest of packets
while (ToPack.size()){
PackData.Clear(); PackData.Clear();
PackData.PID(PIDno); PackData.PID(PIDno);
PackData.ContinuityCounter(( *ContCounter)++); PackData.ContinuityCounter(( *ContCounter)++);
toSend = PackData.AddStuffing(ToPack.bytes(184)); PackData.UnitStart(1);
gonnaSend = ToPack.remove(toSend); if (IsKeyFrame){
PackData.RandomAccess(1);
PackData.PCR(TimeStamp);
}
unsigned int toSend = PackData.AddStuffing(ToPack.bytes(184));
std::string gonnaSend = ToPack.remove(toSend);
PackData.FillFree(gonnaSend); PackData.FillFree(gonnaSend);
TSBuf.write(PackData.ToString(), 188); TSBuf.write(PackData.ToString(), 188);
PacketNumber++; PacketNumber++;
}
TSBuf.flush(); //rest of packets
if (TSBuf.str().size()){ while (ToPack.size()){
conn.SendNow(TSBuf.str().c_str(), TSBuf.str().size()); PackData.Clear();
PackData.PID(PIDno);
PackData.ContinuityCounter(( *ContCounter)++);
toSend = PackData.AddStuffing(ToPack.bytes(184));
gonnaSend = ToPack.remove(toSend);
PackData.FillFree(gonnaSend);
TSBuf.write(PackData.ToString(), 188);
PacketNumber++;
}
TSBuf.flush();
if (TSBuf.str().size()){
conn.SendNow(TSBuf.str().c_str(), TSBuf.str().size());
TSBuf.str("");
}
TSBuf.str(""); TSBuf.str("");
PacketNumber = 0;
} }
TSBuf.str("");
PacketNumber = 0;
} }
} }
return 0;
} }
return 0;
} }
int main(int argc, char ** argv){ int main(int argc, char ** argv){
@ -160,7 +167,7 @@ int main(int argc, char ** argv){
if (S.connected()){ //check if the new connection is valid if (S.connected()){ //check if the new connection is valid
pid_t myid = fork(); pid_t myid = fork();
if (myid == 0){ //if new child, start MAINHANDLER if (myid == 0){ //if new child, start MAINHANDLER
return TS_Handler(S, conf.getString("streamname")); return Connector_TS::tsConnector(S, conf.getString("streamname"));
}else{ //otherwise, do nothing or output debugging text }else{ //otherwise, do nothing or output debugging text
#if DEBUG >= 5 #if DEBUG >= 5
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket()); fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());