diff --git a/src/connectors/conn_http.cpp b/src/connectors/conn_http.cpp index 0b5e7e4e..296cfb5f 100644 --- a/src/connectors/conn_http.cpp +++ b/src/connectors/conn_http.cpp @@ -30,18 +30,18 @@ namespace Connector_HTTP { class ConnConn{ public: Socket::Connection * conn; ///< The socket of this connection - unsigned int lastuse; ///< Seconds since last use of this connection. - tthread::mutex in_use; ///< Mutex for this connection. - /// Constructor that sets the socket and lastuse to 0. + unsigned int lastUse; ///< Seconds since last use of this connection. + tthread::mutex inUse; ///< Mutex for this connection. + /// Constructor that sets the socket and lastUse to 0. ConnConn(){ conn = 0; - lastuse = 0; + lastUse = 0; } ; - /// Constructor that sets lastuse to 0, but socket to s. + /// Constructor that sets lastUse to 0, but socket to s. ConnConn(Socket::Connection * s){ conn = s; - lastuse = 0; + lastUse = 0; } ; /// Destructor that deletes the socket if non-null. @@ -55,44 +55,51 @@ namespace Connector_HTTP { ; }; - std::map connconn; ///< Connections to connectors - std::set active_threads; ///< Holds currently active threads - std::set done_threads; ///< Holds threads that are done and ready to be joined. - tthread::mutex thread_mutex; ///< Mutex for adding/removing threads. - tthread::mutex conn_mutex; ///< Mutex for adding/removing connector connections. - tthread::mutex timeout_mutex; ///< Mutex for timeout thread. + std::map connectorConnections; ///< Connections to connectors + std::set activeThreads; ///< Holds currently active threads + std::set doneThreads; ///< Holds threads that are done and ready to be joined. + tthread::mutex threadMutex; ///< Mutex for adding/removing threads. + tthread::mutex connMutex; ///< Mutex for adding/removing connector connections. + tthread::mutex timeoutMutex; ///< Mutex for timeout thread. tthread::thread * timeouter = 0; ///< Thread that times out connections to connectors. + ///\brief Function run as a thread to timeout requests on the proxy. + ///\param n A NULL-pointer void proxyTimeoutThread(void * n){ n = 0; //prevent unused variable warning - tthread::lock_guard guard(timeout_mutex); + tthread::lock_guard guard(timeoutMutex); while (true){ { - tthread::lock_guard guard(conn_mutex); - if (connconn.empty()){ + tthread::lock_guard guard(connMutex); + if (connectorConnections.empty()){ return; } std::map::iterator it; - for (it = connconn.begin(); it != connconn.end(); it++){ - if ( !it->second->conn->connected() || it->second->lastuse++ > 15){ - if (it->second->in_use.try_lock()){ - it->second->in_use.unlock(); + for (it = connectorConnections.begin(); it != connectorConnections.end(); it++){ + if ( !it->second->conn->connected() || it->second->lastUse++ > 15){ + if (it->second->inUse.try_lock()){ + it->second->inUse.unlock(); delete it->second; - connconn.erase(it); - it = connconn.begin(); //get a valid iterator - if (it == connconn.end()){ + connectorConnections.erase(it); + it = connectorConnections.begin(); //get a valid iterator + if (it == connectorConnections.end()){ return; } } } } - conn_mutex.unlock(); + connMutex.unlock(); } usleep(1000000); //sleep 1 second and re-check } } - /// Handles requests without associated handler, displaying a nice friendly error message. + ///\brief Handles requests without associated handler. + /// + ///Displays a friendly error message. + ///\param H The request to be handled. + ///\param conn The connection to the client that issued the request. + ///\return A timestamp indicating when the request was parsed. long long int proxyHandleUnsupported(HTTP::Parser & H, Socket::Connection * conn){ H.Clean(); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); @@ -103,6 +110,12 @@ namespace Connector_HTTP { return ret; } + ///\brief Handles requests that have timed out. + /// + ///Displays a friendly error message. + ///\param H The request that was being handled upon timeout. + ///\param conn The connection to the client that issued the request. + ///\return A timestamp indicating when the request was parsed. long long int proxyHandleTimeout(HTTP::Parser & H, Socket::Connection * conn){ H.Clean(); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); @@ -113,7 +126,19 @@ namespace Connector_HTTP { return ret; } - /// Handles internal requests. + ///\brief Handles requests within the proxy. + /// + ///Currently supported urls: + /// - /crossdomain.xml + /// - /clientaccesspolicy.xml + /// - *.ico (for favicon) + /// - /info_[streamname].js (stream info) + /// - /embed_[streamname].js (embed info) + /// + ///Unsupported urls default to proxyHandleUnsupported( ). + ///\param H The request to be handled. + ///\param conn The connection to the client that issued the request. + ///\return A timestamp indicating when the request was parsed. long long int proxyHandleInternal(HTTP::Parser & H, Socket::Connection * conn){ std::string url = H.getUrl(); @@ -263,7 +288,11 @@ namespace Connector_HTTP { return proxyHandleUnsupported(H, conn); //anything else doesn't get handled } - /// Handles requests without associated handler, displaying a nice friendly error message. + ///\brief Handles requests by dispatching them to the corresponding connector. + ///\param H The request to be handled. + ///\param conn The connection to the client that issued the request. + ///\param connector The type of connector to be invoked. + ///\return A timestamp indicating when the request was parsed. long long int proxyHandleThroughConnector(HTTP::Parser & H, Socket::Connection * conn, std::string & connector){ //create a unique ID based on a hash of the user agent and host, followed by the stream name and connector std::string uid = Secure::md5(H.GetHeader("User-Agent") + conn->getHost()) + "_" + H.GetVar("stream") + "_" + connector; @@ -275,13 +304,13 @@ namespace Connector_HTTP { H.Clean(); //check if a connection exists, and if not create one - conn_mutex.lock(); - if ( !connconn.count(uid) || !connconn[uid]->conn->connected()){ - if (connconn.count(uid)){ - connconn.erase(uid); + connMutex.lock(); + if ( !connectorConnections.count(uid) || !connectorConnections[uid]->conn->connected()){ + if (connectorConnections.count(uid)){ + connectorConnections.erase(uid); } - connconn[uid] = new ConnConn(new Socket::Connection("/tmp/mist/http_" + connector)); - connconn[uid]->conn->setBlocking(false); //do not block on spool() with no data + connectorConnections[uid] = new ConnConn(new Socket::Connection("/tmp/mist/http_" + connector)); + connectorConnections[uid]->conn->setBlocking(false); //do not block on spool() with no data #if DEBUG >= 4 std::cout << "Created new connection " << uid << std::endl; #endif @@ -291,56 +320,56 @@ namespace Connector_HTTP { #endif } //start a new timeout thread, if neccesary - if (timeout_mutex.try_lock()){ + if (timeoutMutex.try_lock()){ if (timeouter){ timeouter->join(); delete timeouter; } timeouter = new tthread::thread(Connector_HTTP::proxyTimeoutThread, 0); - timeout_mutex.unlock(); + timeoutMutex.unlock(); } - conn_mutex.unlock(); + connMutex.unlock(); //lock the mutex for this connection, and handle the request - tthread::lock_guard guard(connconn[uid]->in_use); + tthread::lock_guard guard(connectorConnections[uid]->inUse); //if the server connection is dead, handle as timeout. - if ( !connconn.count(uid) || !connconn[uid]->conn->connected()){ - connconn[uid]->conn->close(); + if ( !connectorConnections.count(uid) || !connectorConnections[uid]->conn->connected()){ + connectorConnections[uid]->conn->close(); return proxyHandleTimeout(H, conn); } //forward the original request - connconn[uid]->conn->SendNow(request); - connconn[uid]->lastuse = 0; + connectorConnections[uid]->conn->SendNow(request); + connectorConnections[uid]->lastUse = 0; unsigned int timeout = 0; unsigned int retries = 0; //wait for a response - while (connconn.count(uid) && connconn[uid]->conn->connected() && conn->connected()){ + while (connectorConnections.count(uid) && connectorConnections[uid]->conn->connected() && conn->connected()){ conn->spool(); - if (connconn[uid]->conn->Received().size() || connconn[uid]->conn->spool()){ + if (connectorConnections[uid]->conn->Received().size() || connectorConnections[uid]->conn->spool()){ //make sure we end in a \n - if ( *(connconn[uid]->conn->Received().get().rbegin()) != '\n'){ - std::string tmp = connconn[uid]->conn->Received().get(); - connconn[uid]->conn->Received().get().clear(); - if (connconn[uid]->conn->Received().size()){ - connconn[uid]->conn->Received().get().insert(0, tmp); + if ( *(connectorConnections[uid]->conn->Received().get().rbegin()) != '\n'){ + std::string tmp = connectorConnections[uid]->conn->Received().get(); + connectorConnections[uid]->conn->Received().get().clear(); + if (connectorConnections[uid]->conn->Received().size()){ + connectorConnections[uid]->conn->Received().get().insert(0, tmp); }else{ - connconn[uid]->conn->Received().append(tmp); + connectorConnections[uid]->conn->Received().append(tmp); } } //check if the whole response was received - if (H.Read(connconn[uid]->conn->Received().get())){ + if (H.Read(connectorConnections[uid]->conn->Received().get())){ //208 means the fragment is too new, retry in 3s if (H.url == "208"){ retries++; if (retries >= 5){ std::cout << "[5 retry-laters, cancelled]" << std::endl; - connconn[uid]->conn->close(); + connectorConnections[uid]->conn->close(); return proxyHandleTimeout(H, conn); } - connconn[uid]->lastuse = 0; + connectorConnections[uid]->lastUse = 0; timeout = 0; Util::sleep(3000); - connconn[uid]->conn->SendNow(request); + connectorConnections[uid]->conn->SendNow(request); H.Clean(); continue; } @@ -350,16 +379,16 @@ namespace Connector_HTTP { //keep trying unless the timeout triggers if (timeout++ > 4000){ std::cout << "[20s timeout triggered]" << std::endl; - connconn[uid]->conn->close(); + connectorConnections[uid]->conn->close(); return proxyHandleTimeout(H, conn); }else{ Util::sleep(5); } } } - if ( !connconn.count(uid) || !connconn[uid]->conn->connected() || !conn->connected()){ + if ( !connectorConnections.count(uid) || !connectorConnections[uid]->conn->connected() || !conn->connected()){ //failure, disconnect and sent error to user - connconn[uid]->conn->close(); + connectorConnections[uid]->conn->close(); return proxyHandleTimeout(H, conn); }else{ long long int ret = Util::getMS(); @@ -375,9 +404,9 @@ namespace Connector_HTTP { H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); conn->SendNow(H.BuildResponse("200", "OK")); //switch out the connection for an empty one - it makes no sense to keep these globally - Socket::Connection * myConn = connconn[uid]->conn; - connconn[uid]->conn = new Socket::Connection(); - connconn[uid]->in_use.unlock(); + Socket::Connection * myConn = connectorConnections[uid]->conn; + connectorConnections[uid]->conn = new Socket::Connection(); + connectorConnections[uid]->inUse.unlock(); //continue sending data from this socket and keep it permanently in use while (myConn->connected() && conn->connected()){ if (myConn->Received().size() || myConn->spool()){ @@ -396,12 +425,16 @@ namespace Connector_HTTP { } } - /// Returns the name of the HTTP connector the given request should be served by. - /// Can currently return: - /// - none (request not supported) - /// - internal (request fed from information internal to this connector) - /// - dynamic (request fed from http_dynamic connector) - /// - progressive (request fed from http_progressive connector) + ///\brief Determines the type of connector to be used for handling a request. + ///\param H The request to be handled.. + ///\return A string indicating the type of connector. + ///Possible values are: + /// - "none" The request is not supported. + /// - "internal" The request should be handled by the proxy itself. + /// - "dynamic" The request should be dispatched to the HTTP Dynamic Connector + /// - "progressive" The request should be dispatched to the HTTP Progressive Connector + /// - "smooth" The request should be dispatched to the HTTP Smooth Connector + /// - "live" The request should be dispatched to the HTTP Live Connector std::string proxyGetHandleType(HTTP::Parser & H){ std::string url = H.getUrl(); if (url.find("/dynamic/") != std::string::npos){ @@ -449,7 +482,8 @@ namespace Connector_HTTP { return "none"; } - /// Thread for handling a single HTTP connection + ///\brief Function run as a thread to handle a single HTTP connection. + ///\param pointer A Socket::Connection* indicating the connection to th client. void proxyHandleHTTPConnection(void * pointer){ Socket::Connection * conn = (Socket::Connection *)pointer; conn->setBlocking(false); //do not block on conn.spool() when no data is available @@ -504,17 +538,17 @@ namespace Connector_HTTP { //close and remove the connection conn->close(); delete conn; - //remove this thread from active_threads and add it to done_threads. - thread_mutex.lock(); - for (std::set::iterator it = active_threads.begin(); it != active_threads.end(); it++){ + //remove this thread from activeThreads and add it to doneThreads. + threadMutex.lock(); + for (std::set::iterator it = activeThreads.begin(); it != activeThreads.end(); it++){ if (( *it)->get_id() == tthread::this_thread::get_id()){ tthread::thread * T = ( *it); - active_threads.erase(T); - done_threads.insert(T); + activeThreads.erase(T); + doneThreads.insert(T); break; } } - thread_mutex.unlock(); + threadMutex.unlock(); } } //Connector_HTTP namespace @@ -533,17 +567,17 @@ int main(int argc, char ** argv){ Socket::Connection S = server_socket.accept(); if (S.connected()){ //check if the new connection is valid //lock the thread mutex and spawn a new thread for this connection - Connector_HTTP::thread_mutex.lock(); + Connector_HTTP::threadMutex.lock(); tthread::thread * T = new tthread::thread(Connector_HTTP::proxyHandleHTTPConnection, (void *)(new Socket::Connection(S))); - Connector_HTTP::active_threads.insert(T); + Connector_HTTP::activeThreads.insert(T); //clean up any threads that may have finished - while ( !Connector_HTTP::done_threads.empty()){ - T = *Connector_HTTP::done_threads.begin(); + while ( !Connector_HTTP::doneThreads.empty()){ + T = *Connector_HTTP::doneThreads.begin(); T->join(); - Connector_HTTP::done_threads.erase(T); + Connector_HTTP::doneThreads.erase(T); delete T; } - Connector_HTTP::thread_mutex.unlock(); + Connector_HTTP::threadMutex.unlock(); }else{ Util::sleep(10); //sleep 10ms } @@ -553,16 +587,16 @@ int main(int argc, char ** argv){ //wait for existing connections to drop bool repeat = true; while (repeat){ - Connector_HTTP::thread_mutex.lock(); - repeat = !Connector_HTTP::active_threads.empty(); + Connector_HTTP::threadMutex.lock(); + repeat = !Connector_HTTP::activeThreads.empty(); //clean up any threads that may have finished - while ( !Connector_HTTP::done_threads.empty()){ - tthread::thread * T = *Connector_HTTP::done_threads.begin(); + while ( !Connector_HTTP::doneThreads.empty()){ + tthread::thread * T = *Connector_HTTP::doneThreads.begin(); T->join(); - Connector_HTTP::done_threads.erase(T); + Connector_HTTP::doneThreads.erase(T); delete T; } - Connector_HTTP::thread_mutex.unlock(); + Connector_HTTP::threadMutex.unlock(); if (repeat){ Util::sleep(100); //sleep 100ms } diff --git a/src/connectors/conn_http_dynamic.cpp b/src/connectors/conn_http_dynamic.cpp index 20234891..5cafed0b 100644 --- a/src/connectors/conn_http_dynamic.cpp +++ b/src/connectors/conn_http_dynamic.cpp @@ -26,11 +26,11 @@ /// Holds everything unique to HTTP Connectors. namespace Connector_HTTP { ///\brief Builds a bootstrap for use in HTTP Dynamic streaming. - ///\param MovieId The name of the movie. + ///\param streamName The name of the stream. ///\param metadata The current metadata, used to generate the index. ///\param fragnum The index of the current fragment ///\return The generated bootstrap. - std::string dynamicBootstrap(std::string & MovieId, JSON::Value & metadata, int fragnum = 0){ + std::string dynamicBootstrap(std::string & streamName, JSON::Value & metadata, int fragnum = 0){ std::string empty; MP4::ASRT asrt; @@ -80,7 +80,7 @@ namespace Connector_HTTP { abst.setLive(false); abst.setCurrentMediaTime(metadata["lastms"].asInt()); abst.setSmpteTimeCodeOffset(0); - abst.setMovieIdentifier(MovieId); + abst.setMovieIdentifier(streamName); abst.setSegmentRunTable(asrt, 0); abst.setFragmentRunTable(afrt, 0); @@ -91,24 +91,24 @@ namespace Connector_HTTP { } ///\brief Builds an index file for HTTP Dynamic streaming. - ///\param MovieId The name of the movie. + ///\param streamName The name of the stream. ///\param metadata The current metadata, used to generate the index. ///\return The index file for HTTP Dynamic Streaming. - std::string dynamicIndex(std::string & MovieId, JSON::Value & metadata){ + std::string dynamicIndex(std::string & streamName, JSON::Value & metadata){ std::string Result; if (metadata.isMember("vod")){ Result = "\n" "\n" - "" + MovieId + "\n" + "" + streamName + "\n" "" + metadata["video"]["width"].asString() + "\n" "" + metadata["video"]["height"].asString() + "\n" "" + metadata["length"].asString() + ".000\n" "video/mp4\n" "recorded\n" "streaming\n" - "" + Base64::encode(dynamicBootstrap(MovieId, metadata)) + "\n" - "\n" + "" + Base64::encode(dynamicBootstrap(streamName, metadata)) + "\n" + "\n" "AgAKb25NZXRhRGF0YQMAAAk=\n" "\n" "\n"; @@ -116,15 +116,15 @@ namespace Connector_HTTP { Result = "\n" "\n" - "" + MovieId + "\n" + "" + streamName + "\n" "" "video/mp4\n" "live\n" "streaming\n" - "\n" + "\n" "AgAKb25NZXRhRGF0YQMAAAk=\n" "\n" - "\n" + "\n" "\n"; } #if DEBUG >= 8 diff --git a/src/connectors/conn_http_live.cpp b/src/connectors/conn_http_live.cpp index bb7606e5..25b05c82 100644 --- a/src/connectors/conn_http_live.cpp +++ b/src/connectors/conn_http_live.cpp @@ -26,10 +26,9 @@ /// Holds everything unique to HTTP Connectors. namespace Connector_HTTP { ///\brief Builds an index file for HTTP Live streaming. - ///\param MovieId The name of the movie. ///\param metadata The current metadata, used to generate the index. ///\return The index file for HTTP Live Streaming. - std::string liveIndex(std::string & MovieId, JSON::Value & metadata){ + std::string liveIndex(JSON::Value & metadata){ std::stringstream Result; if ( !metadata.isMember("live")){ int longestFragment = 0; @@ -62,7 +61,7 @@ namespace Connector_HTTP { return Result.str(); } //liveIndex - ///\brief Main function for the HTTP HLS Connector + ///\brief Main function for the HTTP Live Connector ///\param conn A socket describing the connection the client. ///\return The exit code of the connector. int liveConnector(Socket::Connection conn){ @@ -175,7 +174,7 @@ namespace Connector_HTTP { HTTP_S.Clean(); HTTP_S.SetHeader("Content-Type", manifestType); HTTP_S.SetHeader("Cache-Control", "no-cache"); - std::string manifest = liveIndex(streamname, Strm.metadata); + std::string manifest = liveIndex(Strm.metadata); HTTP_S.SetBody(manifest); conn.SendNow(HTTP_S.BuildResponse("200", "OK")); } diff --git a/src/connectors/conn_http_smooth.cpp b/src/connectors/conn_http_smooth.cpp index cd9dfce5..4ad51f08 100644 --- a/src/connectors/conn_http_smooth.cpp +++ b/src/connectors/conn_http_smooth.cpp @@ -28,10 +28,9 @@ ///\brief Holds everything unique to HTTP Connectors. namespace Connector_HTTP { ///\brief Builds an index file for HTTP Smooth streaming. - ///\param MovieId The name of the movie. ///\param metadata The current metadata, used to generate the index. ///\return The index file for HTTP Smooth Streaming. - std::string smoothIndex(std::string & MovieId, JSON::Value & metadata){ + std::string smoothIndex(JSON::Value & metadata){ std::stringstream Result; Result << "\n"; Result << " #include -/// Contains the main code for the RAW connector. -/// Expects a single commandline argument telling it which stream to connect to, -/// then outputs the raw stream to stdout. +///\brief Contains the main code for the RAW connector. +/// +///Expects a single commandline argument telling it which stream to connect to, +///then outputs the raw stream to stdout. int main(int argc, char ** argv){ Util::Config conf(argv[0], PACKAGE_VERSION); conf.addOption("stream_name", JSON::fromString("{\"arg_num\":1, \"help\":\"Name of the stream to write to stdout.\"}")); diff --git a/src/connectors/conn_rtmp.cpp b/src/connectors/conn_rtmp.cpp index db16cf59..24e0be1f 100644 --- a/src/connectors/conn_rtmp.cpp +++ b/src/connectors/conn_rtmp.cpp @@ -2,6 +2,8 @@ /// Contains the main code for the RTMP Connector #include +#include + #include #include #include @@ -10,7 +12,7 @@ #include #include #include -#include + #include #include #include @@ -19,586 +21,590 @@ #include #include -/// Holds all functions and data unique to the RTMP Connector +///\brief Holds everything unique to the RTMP Connector namespace Connector_RTMP { //for connection to server - bool ready4data = false; ///< Set to true when streaming starts. - bool inited = false; ///< Set to true when ready to connect to Buffer. - bool nostats = false; ///< Set to true if no stats should be sent anymore (push mode). - bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled. + bool ready4data = false; ///< Indicates whether streaming can start. + bool inited = false; ///< Indicates whether we are ready to connect to the Buffer. + bool noStats = false; ///< Indicates when no stats should be sent anymore. Used in push mode. + bool stopParsing = false; ///< Indicates when to stop all parsing. //for reply to play command - int play_trans = -1; - int play_streamid = -1; - int play_msgtype = -1; + int playTransaction = -1;///= 8 + std::cerr << amfReply.Print() << std::endl; + #endif + if (messageType == 17){ + Socket.SendNow(RTMPStream::SendChunk(3, messageType, streamId, (char)0 + amfReply.Pack())); + }else{ + Socket.SendNow(RTMPStream::SendChunk(3, messageType, streamId, amfReply.Pack())); + } + } //sendCommand - ///\brief Main Connector_RTMP function + ///\brief Parses a single AMF command message, and sends a direct response through sendCommand(). + ///\param amfData The received request. + ///\param messageType The type of message. + ///\param streamId The ID of the AMF stream. + void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId){ + #if DEBUG >= 5 + fprintf(stderr, "Received command: %s\n", amfData.Print().c_str()); + #endif + #if DEBUG >= 8 + fprintf(stderr, "AMF0 command: %s\n", amfData.getContentP(0)->StrValue().c_str()); + #endif + if (amfData.getContentP(0)->StrValue() == "connect"){ + double objencoding = 0; + if (amfData.getContentP(2)->getContentP("objectEncoding")){ + objencoding = amfData.getContentP(2)->getContentP("objectEncoding")->NumValue(); + } + #if DEBUG >= 6 + int tmpint; + if (amfData.getContentP(2)->getContentP("videoCodecs")){ + tmpint = (int)amfData.getContentP(2)->getContentP("videoCodecs")->NumValue(); + if (tmpint & 0x04){ + fprintf(stderr, "Sorensen video support detected\n"); + } + if (tmpint & 0x80){ + fprintf(stderr, "H264 video support detected\n"); + } + } + if (amfData.getContentP(2)->getContentP("audioCodecs")){ + tmpint = (int)amfData.getContentP(2)->getContentP("audioCodecs")->NumValue(); + if (tmpint & 0x04){ + fprintf(stderr, "MP3 audio support detected\n"); + } + if (tmpint & 0x400){ + fprintf(stderr, "AAC audio support detected\n"); + } + } + #endif + RTMPStream::chunk_snd_max = 4096; + Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max)); //send chunk size max (msg 1) + Socket.Send(RTMPStream::SendCTL(5, RTMPStream::snd_window_size)); //send window acknowledgement size (msg 5) + Socket.Send(RTMPStream::SendCTL(6, RTMPStream::rec_window_size)); //send rec window acknowledgement size (msg 6) + Socket.Send(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 + //send a _result reply + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "_result")); //result success + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("")); //server properties + amfReply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,5,5,2004")); + amfReply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31)); + amfReply.getContentP(2)->addContent(AMF::Object("mode", (double)1)); + amfReply.addContent(AMF::Object("")); //info + amfReply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfReply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success")); + amfReply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded.")); + amfReply.getContentP(3)->addContent(AMF::Object("clientid", 1337)); + amfReply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding)); + //amfReply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); + //amfReply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); + sendCommand(amfReply, messageType, streamId); + //send onBWDone packet - no clue what it is, but real server sends it... + //amfReply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + //amfReply.addContent(AMF::Object("", "onBWDone"));//result + //amfReply.addContent(amfData.getContent(1));//same transaction ID + //amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null + //sendCommand(amfReply, messageType, streamId); + return; + } //connect + if (amfData.getContentP(0)->StrValue() == "createStream"){ + //send a _result reply + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "_result")); //result success + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfReply.addContent(AMF::Object("", (double)1)); //stream ID - we use 1 + sendCommand(amfReply, messageType, streamId); + Socket.Send(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 + return; + } //createStream + if ((amfData.getContentP(0)->StrValue() == "closeStream") || (amfData.getContentP(0)->StrValue() == "deleteStream")){ + if (ss.connected()){ + ss.close(); + } + return; + } + if ((amfData.getContentP(0)->StrValue() == "FCPublish") || (amfData.getContentP(0)->StrValue() == "FCUnpublish") || (amfData.getContentP(0)->StrValue() == "releaseStream")){ + // ignored + return; + } + if ((amfData.getContentP(0)->StrValue() == "getStreamLength") || (amfData.getContentP(0)->StrValue() == "getMovLen")){ + //send a _result reply + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "_result")); //result success + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfReply.addContent(AMF::Object("", (double)0)); //zero length + sendCommand(amfReply, messageType, streamId); + return; + } //getStreamLength + if ((amfData.getContentP(0)->StrValue() == "publish")){ + if (amfData.getContentP(3)){ + streamName = amfData.getContentP(3)->StrValue(); + /// \todo implement push for MistPlayer or restrict and change to getLive + ss = Util::Stream::getStream(streamName); + if ( !ss.connected()){ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to server!\n"); + #endif + Socket.close(); //disconnect user + return; + } + ss.Send("P "); + ss.Send(Socket.getHost().c_str()); + ss.Send("\n"); + noStats = true; + } + //send a _result reply + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "_result")); //result success + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfReply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL)); //publish success? + sendCommand(amfReply, messageType, streamId); + Socket.Send(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 + //send a status reply + amfReply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "onStatus")); //status reply + amfReply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER)); //same transaction ID + amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfReply.addContent(AMF::Object("")); //info + amfReply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfReply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); + amfReply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); + amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfReply, messageType, streamId); + return; + } //getStreamLength + if (amfData.getContentP(0)->StrValue() == "checkBandwidth"){ + //send a _result reply + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "_result")); //result success + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + sendCommand(amfReply, messageType, streamId); + return; + } //checkBandwidth + if ((amfData.getContentP(0)->StrValue() == "play") || (amfData.getContentP(0)->StrValue() == "play2")){ + //set reply number and stream name, actual reply is sent up in the ss.spool() handler + playTransaction = amfData.getContentP(1)->NumValue(); + playMessageType = messageType; + playStreamId = streamId; + streamName = amfData.getContentP(3)->StrValue(); + Connector_RTMP::ready4data = true; //start sending video data! + return; + } //play + if ((amfData.getContentP(0)->StrValue() == "seek")){ + //set reply number and stream name, actual reply is sent up in the ss.spool() handler + playTransaction = amfData.getContentP(1)->NumValue(); + playMessageType = messageType; + playStreamId = streamId; + streamInited = false; + + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "onStatus")); //status reply + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfReply.addContent(AMF::Object("")); //info + amfReply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfReply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Seek.Notify")); + amfReply.getContentP(3)->addContent(AMF::Object("description", "Seeking to the specified time")); + amfReply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfReply, playMessageType, playStreamId); + ss.Send("s "); + ss.Send(JSON::Value((long long int)amfData.getContentP(3)->NumValue()).asString().c_str()); + ss.Send("\n"); + return; + } //seek + if ((amfData.getContentP(0)->StrValue() == "pauseRaw") || (amfData.getContentP(0)->StrValue() == "pause")){ + if (amfData.getContentP(3)->NumValue()){ + ss.Send("q\n"); //quit playing + //send a status reply + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "onStatus")); //status reply + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfReply.addContent(AMF::Object("")); //info + amfReply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfReply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Pause.Notify")); + amfReply.getContentP(3)->addContent(AMF::Object("description", "Pausing playback")); + amfReply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfReply, playMessageType, playStreamId); + }else{ + ss.Send("p\n"); //start playing + //send a status reply + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "onStatus")); //status reply + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfReply.addContent(AMF::Object("")); //info + amfReply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfReply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Unpause.Notify")); + amfReply.getContentP(3)->addContent(AMF::Object("description", "Resuming playback")); + amfReply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfReply, playMessageType, playStreamId); + } + return; + } //seek + + #if DEBUG >= 2 + fprintf(stderr, "AMF0 command not processed!\n%s\n", amfData.Print().c_str()); + #endif + } //parseAMFCommand + + ///\brief Gets and parses one RTMP chunk at a time. + ///\param inputBuffer A buffer filled with chunk data. + void parseChunk(Socket::Buffer & inputBuffer){ + //for DTSC conversion + static JSON::Value meta_out; + static std::stringstream prebuffer; // Temporary buffer before sending real data + static bool sending = false; + static unsigned int counter = 0; + //for chunk parsing + static RTMPStream::Chunk next; + FLV::Tag F; + static AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER); + static AMF::Object amfelem("empty", AMF::AMF0_DDV_CONTAINER); + static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER); + static AMF::Object3 amf3elem("empty", AMF::AMF3_DDV_CONTAINER); + + while (next.Parse(inputBuffer)){ + + //send ACK if we received a whole window + if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)){ + RTMPStream::rec_window_at = RTMPStream::rec_cnt; + Socket.Send(RTMPStream::SendCTL(3, RTMPStream::rec_cnt)); //send ack (msg 3) + } + + switch (next.msg_type_id){ + case 0: //does not exist + #if DEBUG >= 2 + fprintf(stderr, "UNKN: Received a zero-type message. Possible data corruption? Aborting!\n"); + #endif + while (inputBuffer.size()){ + inputBuffer.get().clear(); + } + ss.close(); + Socket.close(); + break; //happens when connection breaks unexpectedly + case 1: //set chunk size + RTMPStream::chunk_rec_max = ntohl(*(int*)next.data.c_str()); + #if DEBUG >= 5 + fprintf(stderr, "CTRL: Set chunk size: %i\n", RTMPStream::chunk_rec_max); + #endif + break; + case 2: //abort message - we ignore this one + #if DEBUG >= 5 + fprintf(stderr, "CTRL: Abort message\n"); + #endif + //4 bytes of stream id to drop + break; + case 3: //ack + #if DEBUG >= 8 + fprintf(stderr, "CTRL: Acknowledgement\n"); + #endif + RTMPStream::snd_window_at = ntohl(*(int*)next.data.c_str()); + RTMPStream::snd_window_at = RTMPStream::snd_cnt; + break; + case 4: { + //2 bytes event type, rest = event data + //types: + //0 = stream begin, 4 bytes ID + //1 = stream EOF, 4 bytes ID + //2 = stream dry, 4 bytes ID + //3 = setbufferlen, 4 bytes ID, 4 bytes length + //4 = streamisrecorded, 4 bytes ID + //6 = pingrequest, 4 bytes data + //7 = pingresponse, 4 bytes data + //we don't need to process this + #if DEBUG >= 5 + short int ucmtype = ntohs(*(short int*)next.data.c_str()); + switch (ucmtype){ + case 0: + fprintf(stderr, "CTRL: UCM StreamBegin %i\n", ntohl(*((int*)(next.data.c_str()+2)))); + break; + case 1: + fprintf(stderr, "CTRL: UCM StreamEOF %i\n", ntohl(*((int*)(next.data.c_str()+2)))); + break; + case 2: + fprintf(stderr, "CTRL: UCM StreamDry %i\n", ntohl(*((int*)(next.data.c_str()+2)))); + break; + case 3: + fprintf(stderr, "CTRL: UCM SetBufferLength %i %i\n", ntohl(*((int*)(next.data.c_str()+2))), ntohl(*((int*)(next.data.c_str()+6)))); + break; + case 4: + fprintf(stderr, "CTRL: UCM StreamIsRecorded %i\n", ntohl(*((int*)(next.data.c_str()+2)))); + break; + case 6: + fprintf(stderr, "CTRL: UCM PingRequest %i\n", ntohl(*((int*)(next.data.c_str()+2)))); + break; + case 7: + fprintf(stderr, "CTRL: UCM PingResponse %i\n", ntohl(*((int*)(next.data.c_str()+2)))); + break; + default: + fprintf(stderr, "CTRL: UCM Unknown (%hi)\n", ucmtype); + break; + } + #endif + } + break; + case 5: //window size of other end + #if DEBUG >= 5 + fprintf(stderr, "CTRL: Window size\n"); + #endif + RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str()); + RTMPStream::rec_window_at = RTMPStream::rec_cnt; + Socket.Send(RTMPStream::SendCTL(3, RTMPStream::rec_cnt)); //send ack (msg 3) + break; + case 6: + #if DEBUG >= 5 + fprintf(stderr, "CTRL: Set peer bandwidth\n"); + #endif + //4 bytes window size, 1 byte limit type (ignored) + RTMPStream::snd_window_size = ntohl(*(int*)next.data.c_str()); + Socket.Send(RTMPStream::SendCTL(5, RTMPStream::snd_window_size)); //send window acknowledgement size (msg 5) + break; + case 8: //audio data + case 9: //video data + case 18: //meta data + if (ss.connected()){ + F.ChunkLoader(next); + JSON::Value pack_out = F.toJSON(meta_out); + if ( !pack_out.isNull()){ + if ( !sending){ + counter++; + if (counter > 8){ + sending = true; + ss.SendNow(meta_out.toNetPacked()); + ss.SendNow(prebuffer.str().c_str(), prebuffer.str().size()); //write buffer + prebuffer.str(""); //clear buffer + ss.SendNow(pack_out.toNetPacked()); + }else{ + prebuffer << pack_out.toNetPacked(); + } + }else{ + ss.SendNow(pack_out.toNetPacked()); + } + } + }else{ + #if DEBUG >= 5 + fprintf(stderr, "Received useless media data\n"); + #endif + Socket.close(); + } + break; + case 15: + #if DEBUG >= 5 + fprintf(stderr, "Received AFM3 data message\n"); + #endif + break; + case 16: + #if DEBUG >= 5 + fprintf(stderr, "Received AFM3 shared object\n"); + #endif + break; + case 17: { + #if DEBUG >= 5 + fprintf(stderr, "Received AFM3 command message\n"); + #endif + if (next.data[0] != 0){ + next.data = next.data.substr(1); + amf3data = AMF::parse3(next.data); + #if DEBUG >= 5 + amf3data.Print(); + #endif + }else{ + #if DEBUG >= 5 + fprintf(stderr, "Received AFM3-0 command message\n"); + #endif + next.data = next.data.substr(1); + amfdata = AMF::parse(next.data); + parseAMFCommand(amfdata, 17, next.msg_stream_id); + } //parsing AMF0-style + } + break; + case 19: + #if DEBUG >= 5 + fprintf(stderr, "Received AFM0 shared object\n"); + #endif + break; + case 20: { //AMF0 command message + amfdata = AMF::parse(next.data); + parseAMFCommand(amfdata, 20, next.msg_stream_id); + } + break; + case 22: + #if DEBUG >= 5 + fprintf(stderr, "Received aggregate message\n"); + #endif + break; + default: + #if DEBUG >= 1 + fprintf(stderr, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.\n"); + #endif + stopParsing = true; + break; + } + } + } //parseChunk + + ///\brief Main function for the RTMP Connector ///\param conn A socket describing the connection the client. ///\return The exit code of the connector. - int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ - Socket = conn; - Socket.setBlocking(false); - FLV::Tag tag, init_tag; - DTSC::Stream Strm; + int rtmpConnector(Socket::Connection conn){ + Socket = conn; + Socket.setBlocking(false); + FLV::Tag tag, init_tag; + DTSC::Stream Strm; - while ( !Socket.Received().available(1537) && Socket.connected()){ - Socket.spool(); - Util::sleep(5); - } - RTMPStream::handshake_in = Socket.Received().remove(1537); - RTMPStream::rec_cnt += 1537; - - if (RTMPStream::doHandshake()){ - Socket.SendNow(RTMPStream::handshake_out); - while ( !Socket.Received().available(1536) && Socket.connected()){ + while ( !Socket.Received().available(1537) && Socket.connected()){ Socket.spool(); Util::sleep(5); } - Socket.Received().remove(1536); - RTMPStream::rec_cnt += 1536; -#if DEBUG >= 5 - fprintf(stderr, "Handshake succcess!\n"); -#endif - }else{ -#if DEBUG >= 5 - fprintf(stderr, "Handshake fail!\n"); -#endif - return 0; - } + RTMPStream::handshake_in = Socket.Received().remove(1537); + RTMPStream::rec_cnt += 1537; - unsigned int lastStats = 0; - bool firsttime = true; - - while (Socket.connected()){ - if (Socket.spool() || firsttime){ - parseChunk(Socket.Received()); - firsttime = false; + if (RTMPStream::doHandshake()){ + Socket.SendNow(RTMPStream::handshake_out); + while ( !Socket.Received().available(1536) && Socket.connected()){ + Socket.spool(); + Util::sleep(5); + } + Socket.Received().remove(1536); + RTMPStream::rec_cnt += 1536; + #if DEBUG >= 5 + fprintf(stderr, "Handshake succcess!\n"); + #endif }else{ - Util::sleep(1); //sleep 1ms to prevent high CPU usage + #if DEBUG >= 5 + fprintf(stderr, "Handshake fail!\n"); + #endif + return 0; } - if (ready4data){ - if ( !inited){ - //we are ready, connect the socket! - SS = Util::Stream::getStream(streamname); - if ( !SS.connected()){ -#if DEBUG >= 1 - fprintf(stderr, "Could not connect to server!\n"); -#endif - Socket.close(); //disconnect user - break; - } - SS.setBlocking(false); - SS.SendNow("p\n"); - inited = true; + + unsigned int lastStats = 0; + bool firsttime = true; + + while (Socket.connected()){ + if (Socket.spool() || firsttime){ + parseChunk(Socket.Received()); + firsttime = false; + }else{ + Util::sleep(1); //sleep 1ms to prevent high CPU usage } - if (inited && !nostats){ - long long int now = Util::epoch(); - if (now != lastStats){ - lastStats = now; - SS.SendNow(Socket.getStats("RTMP").c_str()); - } - } - if (SS.spool()){ - while (Strm.parsePacket(SS.Received())){ - if (play_trans != -1){ - //send a status reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus")); //status reply - amfreply.addContent(AMF::Object("", (double)play_trans)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("")); //info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting...")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - sendCommand(amfreply, play_msgtype, play_streamid); - //send streamisrecorded if stream, well, is recorded. - if (Strm.metadata.isMember("length") && Strm.metadata["length"].asInt() > 0){ - Socket.Send(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1 - } - //send streambegin - Socket.Send(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 - //and more reply - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus")); //status reply - amfreply.addContent(AMF::Object("", (double)play_trans)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("")); //info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - sendCommand(amfreply, play_msgtype, play_streamid); - RTMPStream::chunk_snd_max = 102400; //100KiB - Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max)); //send chunk size max (msg 1) - //send dunno? - Socket.Send(RTMPStream::SendUSR(32, 1)); //send UCM no clue?, stream 1 - play_trans = -1; + if (ready4data){ + if ( !inited){ + //we are ready, connect the socket! + ss = Util::Stream::getStream(streamName); + if ( !ss.connected()){ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to server!\n"); + #endif + Socket.close(); //disconnect user + break; } - - //sent init data if needed - if ( !stream_inited){ - init_tag.DTSCMetaInit(Strm); - Socket.SendNow(RTMPStream::SendMedia(init_tag)); - if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){ - init_tag.DTSCAudioInit(Strm); - Socket.SendNow(RTMPStream::SendMedia(init_tag)); - } - if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){ - init_tag.DTSCVideoInit(Strm); - Socket.SendNow(RTMPStream::SendMedia(init_tag)); - } - stream_inited = true; + ss.setBlocking(false); + ss.SendNow("p\n"); + inited = true; + } + if (inited && !noStats){ + long long int now = Util::epoch(); + if (now != lastStats){ + lastStats = now; + ss.SendNow(Socket.getStats("RTMP").c_str()); } - //sent a tag - tag.DTSCLoader(Strm); - Socket.SendNow(RTMPStream::SendMedia(tag)); -#if DEBUG >= 8 - fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), tag.tagTime(), tag.tagType().c_str()); -#endif } - } - } - } - Socket.close(); - SS.SendNow(Socket.getStats("RTMP").c_str()); - SS.close(); - return 0; -} //Connector_RTMP - -///\brief Tries to get and parse one RTMP chunk at a time. -///\param inbuffer A buffer filled with chunk data. -void Connector_RTMP::parseChunk(Socket::Buffer & inbuffer){ - //for DTSC conversion - static JSON::Value meta_out; - static std::stringstream prebuffer; // Temporary buffer before sending real data - static bool sending = false; - static unsigned int counter = 0; - //for chunk parsing - static RTMPStream::Chunk next; - FLV::Tag F; - static AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER); - static AMF::Object amfelem("empty", AMF::AMF0_DDV_CONTAINER); - static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER); - static AMF::Object3 amf3elem("empty", AMF::AMF3_DDV_CONTAINER); - - while (next.Parse(inbuffer)){ - - //send ACK if we received a whole window - if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)){ - RTMPStream::rec_window_at = RTMPStream::rec_cnt; - Socket.Send(RTMPStream::SendCTL(3, RTMPStream::rec_cnt)); //send ack (msg 3) - } - - switch (next.msg_type_id){ - case 0: //does not exist -#if DEBUG >= 2 - fprintf(stderr, "UNKN: Received a zero-type message. Possible data corruption? Aborting!\n"); -#endif - while (inbuffer.size()){ - inbuffer.get().clear(); - } - SS.close(); - Socket.close(); - break; //happens when connection breaks unexpectedly - case 1: //set chunk size - RTMPStream::chunk_rec_max = ntohl(*(int*)next.data.c_str()); -#if DEBUG >= 5 - fprintf(stderr, "CTRL: Set chunk size: %i\n", RTMPStream::chunk_rec_max); -#endif - break; - case 2: //abort message - we ignore this one -#if DEBUG >= 5 - fprintf(stderr, "CTRL: Abort message\n"); -#endif - //4 bytes of stream id to drop - break; - case 3: //ack -#if DEBUG >= 8 - fprintf(stderr, "CTRL: Acknowledgement\n"); -#endif - RTMPStream::snd_window_at = ntohl(*(int*)next.data.c_str()); - RTMPStream::snd_window_at = RTMPStream::snd_cnt; - break; - case 4: { - //2 bytes event type, rest = event data - //types: - //0 = stream begin, 4 bytes ID - //1 = stream EOF, 4 bytes ID - //2 = stream dry, 4 bytes ID - //3 = setbufferlen, 4 bytes ID, 4 bytes length - //4 = streamisrecorded, 4 bytes ID - //6 = pingrequest, 4 bytes data - //7 = pingresponse, 4 bytes data - //we don't need to process this -#if DEBUG >= 5 - short int ucmtype = ntohs(*(short int*)next.data.c_str()); - switch (ucmtype){ - case 0: - fprintf(stderr, "CTRL: UCM StreamBegin %i\n", ntohl(*((int*)(next.data.c_str()+2)))); - break; - case 1: - fprintf(stderr, "CTRL: UCM StreamEOF %i\n", ntohl(*((int*)(next.data.c_str()+2)))); - break; - case 2: - fprintf(stderr, "CTRL: UCM StreamDry %i\n", ntohl(*((int*)(next.data.c_str()+2)))); - break; - case 3: - fprintf(stderr, "CTRL: UCM SetBufferLength %i %i\n", ntohl(*((int*)(next.data.c_str()+2))), ntohl(*((int*)(next.data.c_str()+6)))); - break; - case 4: - fprintf(stderr, "CTRL: UCM StreamIsRecorded %i\n", ntohl(*((int*)(next.data.c_str()+2)))); - break; - case 6: - fprintf(stderr, "CTRL: UCM PingRequest %i\n", ntohl(*((int*)(next.data.c_str()+2)))); - break; - case 7: - fprintf(stderr, "CTRL: UCM PingResponse %i\n", ntohl(*((int*)(next.data.c_str()+2)))); - break; - default: - fprintf(stderr, "CTRL: UCM Unknown (%hi)\n", ucmtype); - break; - } -#endif - } - break; - case 5: //window size of other end -#if DEBUG >= 5 - fprintf(stderr, "CTRL: Window size\n"); -#endif - RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str()); - RTMPStream::rec_window_at = RTMPStream::rec_cnt; - Socket.Send(RTMPStream::SendCTL(3, RTMPStream::rec_cnt)); //send ack (msg 3) - break; - case 6: -#if DEBUG >= 5 - fprintf(stderr, "CTRL: Set peer bandwidth\n"); -#endif - //4 bytes window size, 1 byte limit type (ignored) - RTMPStream::snd_window_size = ntohl(*(int*)next.data.c_str()); - Socket.Send(RTMPStream::SendCTL(5, RTMPStream::snd_window_size)); //send window acknowledgement size (msg 5) - break; - case 8: //audio data - case 9: //video data - case 18: //meta data - if (SS.connected()){ - F.ChunkLoader(next); - JSON::Value pack_out = F.toJSON(meta_out); - if ( !pack_out.isNull()){ - if ( !sending){ - counter++; - if (counter > 8){ - sending = true; - SS.SendNow(meta_out.toNetPacked()); - SS.SendNow(prebuffer.str().c_str(), prebuffer.str().size()); //write buffer - prebuffer.str(""); //clear buffer - SS.SendNow(pack_out.toNetPacked()); - }else{ - prebuffer << pack_out.toNetPacked(); + if (ss.spool()){ + while (Strm.parsePacket(ss.Received())){ + if (playTransaction != -1){ + //send a status reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus")); //status reply + amfreply.addContent(AMF::Object("", (double)playTransaction)); //same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfreply.addContent(AMF::Object("")); //info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting...")); + amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfreply, playMessageType, playStreamId); + //send streamisrecorded if stream, well, is recorded. + if (Strm.metadata.isMember("length") && Strm.metadata["length"].asInt() > 0){ + Socket.Send(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1 } - }else{ - SS.SendNow(pack_out.toNetPacked()); + //send streambegin + Socket.Send(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 + //and more reply + amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus")); //status reply + amfreply.addContent(AMF::Object("", (double)playTransaction)); //same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info + amfreply.addContent(AMF::Object("")); //info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); + amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfreply, playMessageType, playStreamId); + RTMPStream::chunk_snd_max = 102400; //100KiB + Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max)); //send chunk size max (msg 1) + //send dunno? + Socket.Send(RTMPStream::SendUSR(32, 1)); //send UCM no clue?, stream 1 + playTransaction = -1; } + + //sent init data if needed + if ( !streamInited){ + init_tag.DTSCMetaInit(Strm); + Socket.SendNow(RTMPStream::SendMedia(init_tag)); + if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){ + init_tag.DTSCAudioInit(Strm); + Socket.SendNow(RTMPStream::SendMedia(init_tag)); + } + if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){ + init_tag.DTSCVideoInit(Strm); + Socket.SendNow(RTMPStream::SendMedia(init_tag)); + } + streamInited = true; + } + //sent a tag + tag.DTSCLoader(Strm); + Socket.SendNow(RTMPStream::SendMedia(tag)); + #if DEBUG >= 8 + fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), tag.tagTime(), tag.tagType().c_str()); + #endif } - }else{ -#if DEBUG >= 5 - fprintf(stderr, "Received useless media data\n"); -#endif - Socket.close(); } - break; - case 15: -#if DEBUG >= 5 - fprintf(stderr, "Received AFM3 data message\n"); -#endif - break; - case 16: -#if DEBUG >= 5 - fprintf(stderr, "Received AFM3 shared object\n"); -#endif - break; - case 17: { -#if DEBUG >= 5 - fprintf(stderr, "Received AFM3 command message\n"); -#endif - if (next.data[0] != 0){ - next.data = next.data.substr(1); - amf3data = AMF::parse3(next.data); -#if DEBUG >= 5 - amf3data.Print(); -#endif - }else{ -#if DEBUG >= 5 - fprintf(stderr, "Received AFM3-0 command message\n"); -#endif - next.data = next.data.substr(1); - amfdata = AMF::parse(next.data); - parseAMFCommand(amfdata, 17, next.msg_stream_id); - } //parsing AMF0-style - } - break; - case 19: -#if DEBUG >= 5 - fprintf(stderr, "Received AFM0 shared object\n"); -#endif - break; - case 20: { //AMF0 command message - amfdata = AMF::parse(next.data); - parseAMFCommand(amfdata, 20, next.msg_stream_id); - } - break; - case 22: -#if DEBUG >= 5 - fprintf(stderr, "Received aggregate message\n"); -#endif - break; - default: -#if DEBUG >= 1 - fprintf(stderr, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.\n"); -#endif - Connector_RTMP::stopparsing = true; - break; - } - } -} //parseChunk - -void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int stream_id){ -#if DEBUG >= 8 - std::cerr << amfreply.Print() << std::endl; -#endif - if (messagetype == 17){ - Socket.SendNow(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0 + amfreply.Pack())); - }else{ - Socket.SendNow(RTMPStream::SendChunk(3, messagetype, stream_id, amfreply.Pack())); - } -} //sendCommand - -void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id){ -#if DEBUG >= 5 - fprintf(stderr, "Received command: %s\n", amfdata.Print().c_str()); -#endif -#if DEBUG >= 8 - fprintf(stderr, "AMF0 command: %s\n", amfdata.getContentP(0)->StrValue().c_str()); -#endif - if (amfdata.getContentP(0)->StrValue() == "connect"){ - double objencoding = 0; - if (amfdata.getContentP(2)->getContentP("objectEncoding")){ - objencoding = amfdata.getContentP(2)->getContentP("objectEncoding")->NumValue(); - } -#if DEBUG >= 6 - int tmpint; - if (amfdata.getContentP(2)->getContentP("videoCodecs")){ - tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); - if (tmpint & 0x04){ - fprintf(stderr, "Sorensen video support detected\n"); - } - if (tmpint & 0x80){ - fprintf(stderr, "H264 video support detected\n"); } } - if (amfdata.getContentP(2)->getContentP("audioCodecs")){ - tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); - if (tmpint & 0x04){ - fprintf(stderr, "MP3 audio support detected\n"); - } - if (tmpint & 0x400){ - fprintf(stderr, "AAC audio support detected\n"); - } - } -#endif - RTMPStream::chunk_snd_max = 4096; - Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max)); //send chunk size max (msg 1) - Socket.Send(RTMPStream::SendCTL(5, RTMPStream::snd_window_size)); //send window acknowledgement size (msg 5) - Socket.Send(RTMPStream::SendCTL(6, RTMPStream::rec_window_size)); //send rec window acknowledgement size (msg 6) - Socket.Send(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result")); //result success - amfreply.addContent(amfdata.getContent(1)); //same transaction ID - amfreply.addContent(AMF::Object("")); //server properties - amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,5,5,2004")); - amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31)); - amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1)); - amfreply.addContent(AMF::Object("")); //info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded.")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", 1337)); - amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding)); - //amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); - //amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); - sendCommand(amfreply, messagetype, stream_id); - //send onBWDone packet - no clue what it is, but real server sends it... - //amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - //amfreply.addContent(AMF::Object("", "onBWDone"));//result - //amfreply.addContent(amfdata.getContent(1));//same transaction ID - //amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - //sendCommand(amfreply, messagetype, stream_id); - return; - } //connect - if (amfdata.getContentP(0)->StrValue() == "createStream"){ - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result")); //result success - amfreply.addContent(amfdata.getContent(1)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("", (double)1)); //stream ID - we use 1 - sendCommand(amfreply, messagetype, stream_id); - Socket.Send(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 - return; - } //createStream - if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){ - if (SS.connected()){ - SS.close(); - } - return; - } - if ((amfdata.getContentP(0)->StrValue() == "FCPublish") || (amfdata.getContentP(0)->StrValue() == "FCUnpublish") || (amfdata.getContentP(0)->StrValue() == "releaseStream")){ - // ignored - return; - } - if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result")); //result success - amfreply.addContent(amfdata.getContent(1)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("", (double)0)); //zero length - sendCommand(amfreply, messagetype, stream_id); - return; - } //getStreamLength - if ((amfdata.getContentP(0)->StrValue() == "publish")){ - if (amfdata.getContentP(3)){ - streamname = amfdata.getContentP(3)->StrValue(); - /// \todo implement push for MistPlayer or restrict and change to getLive - SS = Util::Stream::getStream(streamname); - if ( !SS.connected()){ -#if DEBUG >= 1 - fprintf(stderr, "Could not connect to server!\n"); -#endif - Socket.close(); //disconnect user - return; - } - SS.Send("P "); - SS.Send(Socket.getHost().c_str()); - SS.Send("\n"); - nostats = true; - } - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result")); //result success - amfreply.addContent(amfdata.getContent(1)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL)); //publish success? - sendCommand(amfreply, messagetype, stream_id); - Socket.Send(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 - //send a status reply - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus")); //status reply - amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("")); //info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - sendCommand(amfreply, messagetype, stream_id); - return; - } //getStreamLength - if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result")); //result success - amfreply.addContent(amfdata.getContent(1)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - sendCommand(amfreply, messagetype, stream_id); - return; - } //checkBandwidth - if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ - //set reply number and stream name, actual reply is sent up in the SS.spool() handler - play_trans = amfdata.getContentP(1)->NumValue(); - play_msgtype = messagetype; - play_streamid = stream_id; - streamname = amfdata.getContentP(3)->StrValue(); - Connector_RTMP::ready4data = true; //start sending video data! - return; - } //play - if ((amfdata.getContentP(0)->StrValue() == "seek")){ - //set reply number and stream name, actual reply is sent up in the SS.spool() handler - play_trans = amfdata.getContentP(1)->NumValue(); - play_msgtype = messagetype; - play_streamid = stream_id; - stream_inited = false; - - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus")); //status reply - amfreply.addContent(amfdata.getContent(1)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("")); //info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Seek.Notify")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Seeking to the specified time")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - sendCommand(amfreply, play_msgtype, play_streamid); - SS.Send("s "); - SS.Send(JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString().c_str()); - SS.Send("\n"); - return; - } //seek - if ((amfdata.getContentP(0)->StrValue() == "pauseRaw") || (amfdata.getContentP(0)->StrValue() == "pause")){ - if (amfdata.getContentP(3)->NumValue()){ - SS.Send("q\n"); //quit playing - //send a status reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus")); //status reply - amfreply.addContent(amfdata.getContent(1)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("")); //info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Pause.Notify")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Pausing playback")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - sendCommand(amfreply, play_msgtype, play_streamid); - }else{ - SS.Send("p\n"); //start playing - //send a status reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus")); //status reply - amfreply.addContent(amfdata.getContent(1)); //same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info - amfreply.addContent(AMF::Object("")); //info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Unpause.Notify")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Resuming playback")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - sendCommand(amfreply, play_msgtype, play_streamid); - } - return; - } //seek - -#if DEBUG >= 2 - fprintf(stderr, "AMF0 command not processed!\n%s\n", amfdata.Print().c_str()); -#endif -} //parseAMFCommand + Socket.close(); + ss.SendNow(Socket.getStats("RTMP").c_str()); + ss.close(); + return 0; + } //Connector_RTMP +} ///\brief The standard process-spawning main function. int main(int argc, char ** argv){ @@ -616,7 +622,7 @@ int main(int argc, char ** argv){ if (S.connected()){ //check if the new connection is valid pid_t myid = fork(); if (myid == 0){ //if new child, start MAINHANDLER - return Connector_RTMP::Connector_RTMP(S); + return Connector_RTMP::rtmpConnector(S); }else{ //otherwise, do nothing or output debugging text #if DEBUG >= 5 fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket()); diff --git a/src/connectors/conn_ts.cpp b/src/connectors/conn_ts.cpp index 02e2bb94..b5c5a48f 100644 --- a/src/connectors/conn_ts.cpp +++ b/src/connectors/conn_ts.cpp @@ -2,145 +2,152 @@ /// Contains the main code for the TS Connector #include +#include +#include + #include #include #include -#include #include #include #include #include -#include #include #include #include + #include #include #include #include //TS support #include //DTSC support #include //For initdata conversion -/// The main function of the connector -/// \param conn A connection with the client -/// \param streamname The name of the stream -int TS_Handler(Socket::Connection conn, std::string streamname){ - std::string ToPack; - TS::Packet PackData; - std::string DTMIData; - int PacketNumber = 0; - long long unsigned int TimeStamp = 0; - int ThisNaluSize; - char VideoCounter = 0; - char AudioCounter = 0; - bool WritePesHeader; - bool IsKeyFrame; - bool FirstKeyFrame = true; - bool FirstIDRInKeyFrame; - MP4::AVCC avccbox; - bool haveAvcc = false; - DTSC::Stream Strm; - bool inited = false; - Socket::Connection ss; +///\brief Holds everything unique to the TS Connector +namespace Connector_TS { + ///\brief Main function for the TS Connector + ///\param conn A socket describing the connection the client. + ///\param streamName The stream to connect to. + ///\return The exit code of the connector. + int tsConnector(Socket::Connection conn, std::string streamName){ + std::string ToPack; + TS::Packet PackData; + std::string DTMIData; + int PacketNumber = 0; + long long unsigned int TimeStamp = 0; + int ThisNaluSize; + char VideoCounter = 0; + char AudioCounter = 0; + bool WritePesHeader; + bool IsKeyFrame; + bool FirstKeyFrame = true; + bool FirstIDRInKeyFrame; + MP4::AVCC avccbox; + bool haveAvcc = false; - while (conn.connected()){ - if ( !inited){ - ss = Util::Stream::getStream(streamname); - if ( !ss.connected()){ -#if DEBUG >= 1 - fprintf(stderr, "Could not connect to server!\n"); -#endif - conn.close(); - break; + DTSC::Stream Strm; + bool inited = false; + Socket::Connection ss; + + while (conn.connected()){ + if ( !inited){ + ss = Util::Stream::getStream(streamName); + if ( !ss.connected()){ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to server!\n"); + #endif + conn.close(); + break; + } + ss.SendNow("p\n"); + inited = true; } - ss.SendNow("p\n"); - inited = true; - } - if (ss.spool()){ - while (Strm.parsePacket(ss.Received())){ - if ( !haveAvcc){ - avccbox.setPayload(Strm.metadata["video"]["init"].asString()); - haveAvcc = true; - } - std::stringstream TSBuf; - Socket::Buffer ToPack; - //write PAT and PMT TS packets - if (PacketNumber == 0){ - PackData.DefaultPAT(); - TSBuf.write(PackData.ToString(), 188); - PackData.DefaultPMT(); - TSBuf.write(PackData.ToString(), 188); - PacketNumber += 2; - } - - int PIDno = 0; - char * ContCounter = 0; - if (Strm.lastType() == DTSC::VIDEO){ - IsKeyFrame = Strm.getPacket(0).isMember("keyframe"); - if (IsKeyFrame){ - TimeStamp = (Strm.getPacket(0)["time"].asInt() * 27000); + if (ss.spool()){ + while (Strm.parsePacket(ss.Received())){ + if ( !haveAvcc){ + avccbox.setPayload(Strm.metadata["video"]["init"].asString()); + haveAvcc = true; } - ToPack.append(avccbox.asAnnexB()); - while (Strm.lastData().size()){ - ThisNaluSize = (Strm.lastData()[0] << 24) + (Strm.lastData()[1] << 16) + (Strm.lastData()[2] << 8) + Strm.lastData()[3]; - Strm.lastData().replace(0, 4, TS::NalHeader, 4); - if (ThisNaluSize + 4 == Strm.lastData().size()){ - ToPack.append(Strm.lastData()); - break; - }else{ - ToPack.append(Strm.lastData().c_str(), ThisNaluSize + 4); - Strm.lastData().erase(0, ThisNaluSize + 4); + std::stringstream TSBuf; + Socket::Buffer ToPack; + //write PAT and PMT TS packets + if (PacketNumber == 0){ + PackData.DefaultPAT(); + TSBuf.write(PackData.ToString(), 188); + PackData.DefaultPMT(); + TSBuf.write(PackData.ToString(), 188); + PacketNumber += 2; + } + + int PIDno = 0; + char * ContCounter = 0; + if (Strm.lastType() == DTSC::VIDEO){ + IsKeyFrame = Strm.getPacket(0).isMember("keyframe"); + if (IsKeyFrame){ + TimeStamp = (Strm.getPacket(0)["time"].asInt() * 27000); } + ToPack.append(avccbox.asAnnexB()); + while (Strm.lastData().size()){ + ThisNaluSize = (Strm.lastData()[0] << 24) + (Strm.lastData()[1] << 16) + (Strm.lastData()[2] << 8) + Strm.lastData()[3]; + Strm.lastData().replace(0, 4, TS::NalHeader, 4); + if (ThisNaluSize + 4 == Strm.lastData().size()){ + ToPack.append(Strm.lastData()); + break; + }else{ + ToPack.append(Strm.lastData().c_str(), ThisNaluSize + 4); + Strm.lastData().erase(0, ThisNaluSize + 4); + } + } + ToPack.prepend(TS::Packet::getPESVideoLeadIn(0ul, Strm.getPacket(0)["time"].asInt() * 90)); + PIDno = 0x100; + ContCounter = &VideoCounter; + }else if (Strm.lastType() == DTSC::AUDIO){ + ToPack.append(TS::GetAudioHeader(Strm.lastData().size(), Strm.metadata["audio"]["init"].asString())); + ToPack.append(Strm.lastData()); + ToPack.prepend(TS::Packet::getPESAudioLeadIn(ToPack.bytes(1073741824ul), Strm.getPacket(0)["time"].asInt() * 90)); + PIDno = 0x101; + ContCounter = &AudioCounter; } - ToPack.prepend(TS::Packet::getPESVideoLeadIn(0ul, Strm.getPacket(0)["time"].asInt() * 90)); - PIDno = 0x100; - ContCounter = &VideoCounter; - }else if (Strm.lastType() == DTSC::AUDIO){ - ToPack.append(TS::GetAudioHeader(Strm.lastData().size(), Strm.metadata["audio"]["init"].asString())); - ToPack.append(Strm.lastData()); - ToPack.prepend(TS::Packet::getPESAudioLeadIn(ToPack.bytes(1073741824ul), Strm.getPacket(0)["time"].asInt() * 90)); - PIDno = 0x101; - ContCounter = &AudioCounter; - } - //initial packet - PackData.Clear(); - PackData.PID(PIDno); - PackData.ContinuityCounter(( *ContCounter)++); - PackData.UnitStart(1); - if (IsKeyFrame){ - PackData.RandomAccess(1); - PackData.PCR(TimeStamp); - } - unsigned int toSend = PackData.AddStuffing(ToPack.bytes(184)); - std::string gonnaSend = ToPack.remove(toSend); - PackData.FillFree(gonnaSend); - TSBuf.write(PackData.ToString(), 188); - PacketNumber++; - - //rest of packets - while (ToPack.size()){ + //initial packet PackData.Clear(); PackData.PID(PIDno); PackData.ContinuityCounter(( *ContCounter)++); - toSend = PackData.AddStuffing(ToPack.bytes(184)); - gonnaSend = ToPack.remove(toSend); + PackData.UnitStart(1); + if (IsKeyFrame){ + PackData.RandomAccess(1); + PackData.PCR(TimeStamp); + } + unsigned int toSend = PackData.AddStuffing(ToPack.bytes(184)); + std::string gonnaSend = ToPack.remove(toSend); PackData.FillFree(gonnaSend); TSBuf.write(PackData.ToString(), 188); PacketNumber++; - } - TSBuf.flush(); - if (TSBuf.str().size()){ - conn.SendNow(TSBuf.str().c_str(), TSBuf.str().size()); + //rest of packets + while (ToPack.size()){ + PackData.Clear(); + PackData.PID(PIDno); + PackData.ContinuityCounter(( *ContCounter)++); + toSend = PackData.AddStuffing(ToPack.bytes(184)); + gonnaSend = ToPack.remove(toSend); + PackData.FillFree(gonnaSend); + TSBuf.write(PackData.ToString(), 188); + PacketNumber++; + } + + TSBuf.flush(); + if (TSBuf.str().size()){ + conn.SendNow(TSBuf.str().c_str(), TSBuf.str().size()); + TSBuf.str(""); + } TSBuf.str(""); + PacketNumber = 0; } - TSBuf.str(""); - PacketNumber = 0; } } + return 0; } - return 0; } int main(int argc, char ** argv){ @@ -160,7 +167,7 @@ int main(int argc, char ** argv){ if (S.connected()){ //check if the new connection is valid pid_t myid = fork(); if (myid == 0){ //if new child, start MAINHANDLER - return TS_Handler(S, conf.getString("streamname")); + return Connector_TS::tsConnector(S, conf.getString("streamname")); }else{ //otherwise, do nothing or output debugging text #if DEBUG >= 5 fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());