From 683e2c619027486fbed6c8decb9de2dd4d2168f3 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 10 Feb 2014 00:36:02 +0100 Subject: [PATCH] Implemented MP4 seeking, improved HTTP proxy debugging output, fixed HTTP proxy bug where closed connections were erronously re-used mid-transfer. --- src/connectors/conn_http.cpp | 38 +-- src/connectors/conn_http_progressive_mp4.cpp | 241 ++++++++++++++++--- 2 files changed, 225 insertions(+), 54 deletions(-) diff --git a/src/connectors/conn_http.cpp b/src/connectors/conn_http.cpp index dcd3ef2c..ae56354b 100644 --- a/src/connectors/conn_http.cpp +++ b/src/connectors/conn_http.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "embed.js.h" @@ -38,13 +39,11 @@ namespace Connector_HTTP { conn = 0; lastUse = 0; } - ; /// Constructor that sets lastUse to 0, but socket to s. ConnConn(Socket::Connection * s){ conn = s; lastUse = 0; } - ; /// Destructor that deletes the socket if non-null. ~ConnConn(){ if (conn){ @@ -53,7 +52,6 @@ namespace Connector_HTTP { } conn = 0; } - ; }; std::map connectorConnections; ///< Connections to connectors @@ -410,13 +408,7 @@ namespace Connector_HTTP { if ( !connectorConnections.count(uid)){ connectorConnections[uid] = new ConnConn(new Socket::Connection(Util::getTmpFolder() + connector)); connectorConnections[uid]->conn->setBlocking(false); //do not block on spool() with no data -#if DEBUG >= 4 - std::cout << "Created new connection " << uid << std::endl; -#endif - }else{ -#if DEBUG >= 5 - std::cout << "Re-using connection " << uid << std::endl; -#endif + DEBUG_MSG(DLVL_HIGH, "Created new connection %s", uid.c_str()); } //attempt to lock the mutex for this connection @@ -424,10 +416,13 @@ namespace Connector_HTTP { myCConn = connectorConnections[uid]; //if the connection is dead, delete it and re-loop if (!myCConn->conn->spool() && !myCConn->conn->connected()){ + DEBUG_MSG(DLVL_HIGH, "Resetting existing connection %s", uid.c_str()); connectorConnections.erase(uid); myCConn->inUse.unlock(); delete myCConn; myCConn = 0; + }else{ + DEBUG_MSG(DLVL_HIGH, "Using active connection %s", uid.c_str()); } } //unlock the connection mutex before sleeping @@ -474,7 +469,7 @@ namespace Connector_HTTP { } retries++; if (retries >= 10){ - std::cout << "[5 retry-laters, cancelled]" << std::endl; + DEBUG_MSG(DLVL_HIGH, "Cancelled connection %s, because of 208 status repeated 10 times", uid.c_str()); myCConn->conn->close(); myCConn->inUse.unlock(); //unset to only read headers @@ -493,7 +488,7 @@ namespace Connector_HTTP { } //keep trying unless the timeout triggers if (timeout++ > 4000){ - std::cout << "[20s timeout triggered]" << std::endl; + DEBUG_MSG(DLVL_HIGH, "Canceled connection %s, 4s timeout", uid.c_str()); myCConn->conn->close(); myCConn->inUse.unlock(); //unset to only read headers @@ -515,12 +510,18 @@ namespace Connector_HTTP { //success, check type of response if (H.GetHeader("Content-Length") != "" || H.GetHeader("Transfer-Encoding") == "chunked"){ //known length - simply re-send the request with added headers and continue + DEBUG_MSG(DLVL_HIGH, "Proxying %s - known length or chunked transfer encoding", uid.c_str()); H.SetHeader("X-UID", uid); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.body = ""; H.Proxy(*(myCConn->conn), conn); + if (!conn.connected()){ + DEBUG_MSG(DLVL_HIGH, "Incoming connection to %s dropped, killing off connector", uid.c_str()); + myCConn->conn->close(); + } myCConn->inUse.unlock(); }else{ + DEBUG_MSG(DLVL_HIGH, "Handing off %s - one-time connection", uid.c_str()); //unknown length H.SetHeader("X-UID", uid); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); @@ -630,11 +631,10 @@ namespace Connector_HTTP { if (conn.spool() || conn.Received().size()){ if (Client.Read(conn)){ std::string handler = proxyGetHandleType(Client); -#if DEBUG >= 4 - std::cout << "Received request: " << Client.getUrl() << " (" << conn.getSocket() << ") => " << handler << " (" << Client.GetVar("stream") - << ")" << std::endl; + DEBUG_MSG(DLVL_HIGH, "Received request: %s (%d) => %s (%s)", Client.getUrl().c_str(), conn.getSocket(), handler.c_str(), Client.GetVar("stream").c_str()); + #if DEBUG >= DLVL_HIGH long long int startms = Util::getMS(); -#endif + #endif long long int midms = 0; bool closeConnection = false; if (Client.GetHeader("Connection") == "close"){ @@ -650,10 +650,10 @@ namespace Connector_HTTP { }else{ midms = proxyHandleThroughConnector(Client, conn, handler); } -#if DEBUG >= 4 + #if DEBUG >= DLVL_HIGH long long int nowms = Util::getMS(); - std::cout << "Completed request " << conn.getSocket() << " " << handler << " in " << (midms - startms) << " ms (processing) / " << (nowms - midms) << " ms (transfer)" << std::endl; -#endif + DEBUG_MSG(DLVL_HIGH, "Completed request %d (%s) in %d ms (processing) / %d ms (transfer)", conn.getSocket(), handler.c_str(), (midms - startms), (nowms - midms)); + #endif if (closeConnection){ break; } diff --git a/src/connectors/conn_http_progressive_mp4.cpp b/src/connectors/conn_http_progressive_mp4.cpp index c3c85b71..40cd2292 100644 --- a/src/connectors/conn_http_progressive_mp4.cpp +++ b/src/connectors/conn_http_progressive_mp4.cpp @@ -46,7 +46,7 @@ namespace Connector_HTTP { long unsigned int index; }; - std::string DTSCMeta2MP4Header(DTSC::Meta & metaData, std::set & tracks){ + std::string DTSCMeta2MP4Header(DTSC::Meta & metaData, std::set & tracks, long long & size){ std::stringstream header; //ftyp box /// \todo fill ftyp with non hardcoded values from file @@ -264,6 +264,7 @@ namespace Connector_HTTP { total = 0; for (std::deque< DTSC::Part>::iterator partIt = metaData.tracks[*it].parts.begin(); partIt != metaData.tracks[*it].parts.end(); partIt ++) { stszBox.setEntrySize(partIt->getSize(), total);//in bytes in file + size += partIt->getSize(); total++; } stblBox.setContent(stszBox,offset++); @@ -371,21 +372,139 @@ namespace Connector_HTTP { header << (char)((mdatSize>>24) & 0x000000FF) << (char)((mdatSize>>16) & 0x000000FF) << (char)((mdatSize>>8) & 0x000000FF) << (char)(mdatSize & 0x000000FF) << "mdat"; //end of header + size += header.str().size(); return header.str(); } + /// Calculate a seekPoint, based on byteStart, metadata, tracks and headerSize. + /// The seekPoint will be set to the timestamp of the first packet to send. + void findSeekPoint(long long byteStart, long long & seekPoint, DTSC::Meta & metadata, std::set & tracks, unsigned int headerSize){ + seekPoint = 0; + //if we're starting in the header, seekPoint is always zero. + if (byteStart <= headerSize){return;} + //okay, we're past the header. Substract the headersize from the starting postion. + byteStart -= headerSize; + //initialize a list of sorted parts that this file contains + std::set sortSet; + for (std::set::iterator subIt = tracks.begin(); subIt != tracks.end(); subIt++) { + keyPart temp; + temp.trackID = *subIt; + temp.time = metadata.tracks[*subIt].firstms;//timeplace of frame + temp.endTime = metadata.tracks[*subIt].firstms + metadata.tracks[*subIt].parts[0].getDuration(); + temp.size = metadata.tracks[*subIt].parts[0].getSize();//bytesize of frame (alle parts all together) + temp.index = 0; + sortSet.insert(temp); + } + //forward through the file by headers, until we reach the point where we need to be + while (!sortSet.empty()){ + //substract the size of this fragment from byteStart + byteStart -= sortSet.begin()->size; + //if that put us past the point where we wanted to be, return right now + if (byteStart < 0){return;} + //otherwise, set seekPoint to where we are now + seekPoint = sortSet.begin()->time; + //then find the next part + keyPart temp; + temp.index = sortSet.begin()->index + 1; + temp.trackID = sortSet.begin()->trackID; + if(temp.index < metadata.tracks[temp.trackID].parts.size() ){//only insert when there are parts left + temp.time = sortSet.begin()->endTime;//timeplace of frame + temp.endTime = sortSet.begin()->endTime + metadata.tracks[temp.trackID].parts[temp.index].getDuration(); + temp.size = metadata.tracks[temp.trackID].parts[temp.index].getSize();//bytesize of frame + sortSet.insert(temp); + } + //remove highest keyPart + sortSet.erase(sortSet.begin()); + } + //If we're here, we're in the last fragment. + //That's technically legal, of course. + } + + /// Parses a "Range: " header, setting byteStart, byteEnd and seekPoint using data from metadata and tracks to do + /// the calculations. + /// On error, byteEnd is set to zero. + void parseRange(std::string header, long long & byteStart, long long & byteEnd, long long & seekPoint, DTSC::Meta & metadata, std::set & tracks, unsigned int headerSize){ + if (header.size() < 6 || header.substr(0, 6) != "bytes="){ + byteEnd = 0; + DEBUG_MSG(DLVL_WARN, "Invalid range header: %s", header.c_str()); + return; + } + header.erase(0, 6); + if (header.size() && header[0] == '-'){ + //negative range = count from end + byteStart = 0; + for (unsigned int i = 1; i < header.size(); ++i){ + if (header[i] >= '0' && header[i] <= '9'){ + byteStart *= 10; + byteStart += header[i] - '0'; + continue; + } + break; + } + if (byteStart > byteEnd){ + //entire file if starting before byte zero + byteStart = 0; + DEBUG_MSG(DLVL_DEVEL, "Full negative range: %lli-%lli", byteStart, byteEnd); + findSeekPoint(byteStart, seekPoint, metadata, tracks, headerSize); + return; + }else{ + //start byteStart bytes before byteEnd + byteStart = byteEnd - byteStart; + DEBUG_MSG(DLVL_DEVEL, "Partial negative range: %lli-%lli", byteStart, byteEnd); + findSeekPoint(byteStart, seekPoint, metadata, tracks, headerSize); + return; + } + }else{ + long long size = byteEnd; + byteEnd = 0; + byteStart = 0; + unsigned int i = 0; + for ( ; i < header.size(); ++i){ + if (header[i] >= '0' && header[i] <= '9'){ + byteStart *= 10; + byteStart += header[i] - '0'; + continue; + } + break; + } + if (header[i] != '-'){ + DEBUG_MSG(DLVL_WARN, "Invalid range header: %s", header.c_str()); + byteEnd = 0; + return; + } + ++i; + if (i < header.size()){ + for ( ; i < header.size(); ++i){ + if (header[i] >= '0' && header[i] <= '9'){ + byteEnd *= 10; + byteEnd += header[i] - '0'; + continue; + } + break; + } + if (byteEnd > size-1){byteEnd = size;} + }else{ + byteEnd = size; + } + DEBUG_MSG(DLVL_DEVEL, "Range request: %lli-%lli (%s)", byteStart, byteEnd, header.c_str()); + findSeekPoint(byteStart, seekPoint, metadata, tracks, headerSize); + return; + } + }//parseRange + ///\brief Main function for the HTTP Progressive Connector ///\param conn A socket describing the connection the client. ///\return The exit code of the connector. int progressiveConnector(Socket::Connection & conn){ DTSC::Stream Strm; //Incoming stream buffer. HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender. + long long byteStart = 0; + long long leftOver = 0; + long long currPos = 0; bool inited = false;//Whether the stream is initialized Socket::Connection ss( -1);//The Stream Socket, used to connect to the desired stream. std::string streamname;//Will contain the name of the stream. - #if DEBUG >= DLVL_HIGH std::set sortSet;//filling sortset for interleaving parts - #endif unsigned int lastStats = 0;//Indicates the last time that we have sent stats to the server socket. @@ -394,15 +513,21 @@ namespace Connector_HTTP { if ( !inited){ if (conn.Received().size() || conn.spool()){ if (HTTP_R.Read(conn)){ -#if DEBUG >= 5 - std::cout << "Received request: " << HTTP_R.getUrl() << std::endl; -#endif + DEBUG_MSG(DLVL_DEVEL, "Received request: %s", HTTP_R.getUrl().c_str()); conn.setHost(HTTP_R.GetHeader("X-Origin")); streamname = HTTP_R.GetHeader("X-Stream"); - //we are ready, connect the socket! - ss = Util::Stream::getStream(streamname); - Strm.waitForMeta(ss); - + if (!ss){ + ss = Util::Stream::getStream(streamname); + if (!ss){ + DEBUG_MSG(DLVL_FAIL, "Could not connect to stream %s!", streamname.c_str()); + ss.close(); + HTTP_S.Clean(); + HTTP_S.SetBody("No such stream is available on the system. Please try again.\n"); + HTTP_S.SendResponse("404", "Not found", conn); + continue; + } + Strm.waitForMeta(ss); + } int videoID = -1; int audioID = -1; if (HTTP_R.GetVar("audio") != ""){ @@ -425,10 +550,51 @@ namespace Connector_HTTP { if (audioID > 0){tracks.insert(audioID);} HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers - HTTP_S.SetHeader("Content-Type", "video/MP4"); //Send the correct content-type for FLV files - HTTP_S.protocol = "HTTP/1.0"; - conn.SendNow(HTTP_S.BuildResponse("200", "OK")); //no SetBody = unknown length - this is intentional, we will stream the entire file - conn.SendNow(DTSCMeta2MP4Header(Strm.metadata, tracks));//SENDING MP4HEADER + HTTP_S.SetHeader("Content-Type", "video/MP4"); //Send the correct content-type for MP4 files + HTTP_S.SetHeader("Accept-Ranges", "bytes, parsec"); + long long size = 0; + std::string headerData = DTSCMeta2MP4Header(Strm.metadata, tracks, size); + byteStart = 0; + long long byteEnd = size-1; + long long seekPoint = 0; + if (HTTP_R.GetHeader("Range") != ""){ + parseRange(HTTP_R.GetHeader("Range"), byteStart, byteEnd, seekPoint, Strm.metadata, tracks, headerData.size()); + if (!byteEnd){ + if (HTTP_R.GetHeader("Range")[0] == 'p'){ + HTTP_S.SetBody("Starsystem not in communications range"); + HTTP_S.SendResponse("416", "Starsystem not in communications range", conn); + HTTP_R.Clean(); //clean for any possible next requests + continue; + }else{ + HTTP_S.SetBody("Requested Range Not Satisfiable"); + HTTP_S.SendResponse("416", "Requested Range Not Satisfiable", conn); + HTTP_R.Clean(); //clean for any possible next requests + continue; + } + }else{ + std::stringstream rangeReply; + rangeReply << "bytes " << byteStart << "-" << byteEnd << "/" << size; + HTTP_S.SetHeader("Content-Length", byteEnd - byteStart + 1); + HTTP_S.SetHeader("Content-Range", rangeReply.str()); + /// \todo Switch to chunked? + HTTP_S.SendResponse("206", "Partial content", conn); + //HTTP_S.StartResponse("206", "Partial content", HTTP_R, conn); + } + }else{ + HTTP_S.SetHeader("Content-Length", byteEnd - byteStart + 1); + /// \todo Switch to chunked? + HTTP_S.SendResponse("200", "OK", conn); + //HTTP_S.StartResponse(HTTP_R, conn); + } + leftOver = byteEnd - byteStart + 1;//add one byte, because range "0-0" = 1 byte of data + currPos = 0; + if (byteStart < (long long)headerData.size()){ + /// \todo Switch to chunked? + //HTTP_S.Chunkify(headerData.data()+byteStart, std::min((long long)headerData.size(), byteEnd) - byteStart, conn);//send MP4 header + conn.SendNow(headerData.data()+byteStart, std::min((long long)headerData.size(), byteEnd) - byteStart);//send MP4 header + leftOver -= std::min((long long)headerData.size(), byteEnd) - byteStart; + } + currPos = headerData.size();//we're now guaranteed to be past the header point, no matter what HTTP_R.Clean(); //clean for any possible next requests {//using scope to have cmd not declared after action std::stringstream cmd; @@ -436,10 +602,10 @@ namespace Connector_HTTP { for (std::set::iterator it = tracks.begin(); it != tracks.end(); it++) { cmd << " " << *it; } - cmd << "\ns 0\np\n"; + cmd << "\ns " << seekPoint << "\np\n"; ss.SendNow(cmd.str()); } - #if DEBUG >= DLVL_HIGH + sortSet.clear(); for (std::set::iterator subIt = tracks.begin(); subIt != tracks.end(); subIt++) { keyPart temp; temp.trackID = *subIt; @@ -449,17 +615,6 @@ namespace Connector_HTTP { temp.index = 0; sortSet.insert(temp); } - #endif - if ( !ss.connected()){ - #if DEBUG >= 1 - fprintf(stderr, "Could not connect to server for %s!\n", streamname.c_str()); - #endif - ss.close(); - HTTP_S.Clean(); - HTTP_S.SetBody("No such stream is available on the system. Please try again.\n"); - conn.SendNow(HTTP_S.BuildResponse("404", "Not found")); - continue; - } inited = true; } } @@ -474,12 +629,8 @@ namespace Connector_HTTP { if (Strm.lastType() == DTSC::PAUSEMARK){ conn.close(); }else if(Strm.lastType() == DTSC::AUDIO || Strm.lastType() == DTSC::VIDEO){ - #if DEBUG >= DLVL_HIGH - if (!sortSet.empty()){ - if ((long long)sortSet.begin()->trackID != Strm.getPacket()["trackid"].asInt() || (long long)sortSet.begin()->time != Strm.getPacket()["time"].asInt()){ - DEBUG_MSG(DLVL_HIGH, "Set[%d, %d] => Real[%d, %d]", sortSet.begin()->trackID, sortSet.begin()->time, Strm.getPacket()["trackid"].asInt(), Strm.getPacket()["time"].asInt()); - } - //add keyPart to sortSet + //keep track of where we are - fast-forward until where we are now + while (!sortSet.empty() && ((long long)sortSet.begin()->trackID != Strm.getPacket()["trackid"].asInt() || (long long)sortSet.begin()->time != Strm.getPacket()["time"].asInt())){ keyPart temp; temp.index = sortSet.begin()->index + 1; temp.trackID = sortSet.begin()->trackID; @@ -489,11 +640,31 @@ namespace Connector_HTTP { temp.size = Strm.metadata.tracks[temp.trackID].parts[temp.index].getSize();//bytesize of frame sortSet.insert(temp); } + currPos += sortSet.begin()->size; //remove highest keyPart sortSet.erase(sortSet.begin()); } - #endif - conn.SendNow(Strm.lastData());//send out and clear Converter buffer + if (currPos >= byteStart){ + sortSet.clear();//we don't need you anymore! + if (leftOver < (long long)Strm.lastData().size()){ + conn.SendNow(Strm.lastData().data(), leftOver); + }else{ + conn.SendNow(Strm.lastData()); + } + //HTTP_S.Chunkify(Strm.lastData().data(), Strm.lastData().size(), conn); + leftOver -= Strm.lastData().size(); + }else{ + if (currPos + (long long)Strm.lastData().size() > byteStart){ + conn.SendNow(Strm.lastData().data()+(byteStart-currPos), Strm.lastData().size()-(byteStart-currPos)); + leftOver -= Strm.lastData().size()-(byteStart-currPos); + currPos = byteStart; + sortSet.clear();//we don't need you anymore! + } + } + if (leftOver < 1){ + ss.SendNow("q\n");//stop playback + inited = false; + } } if (Strm.lastType() == DTSC::INVALID){ DEBUG_MSG(DLVL_FAIL, "Invalid packet received - closing connection");