Removed lots of left-over debug messages, added chunked transfer support to all segmented HTTP streaming methods, fixed several major slowdown problems, fixed HTTP progressive seconds-seeking.

This commit is contained in:
Thulinma 2013-08-23 00:07:02 +02:00
parent 2d9859bbad
commit 2bc9858b66
7 changed files with 460 additions and 481 deletions

View file

@ -403,18 +403,14 @@ namespace Connector_HTTP {
}else{
long long int ret = Util::getMS();
//success, check type of response
std::cout << "Response headers for " << orig_url << " received...";
if (H.GetHeader("Content-Length") != "" || H.GetHeader("Transfer-Encoding") == "chunked"){
//known length - simply re-send the request with added headers and continue
H.SetHeader("X-UID", uid);
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.body = "";
std::cout << "proxying..." << std::endl;
H.Proxy(*(myCConn->conn), *conn);
std::cout << "Proxying " << orig_url << " completed!" << std::endl;
myCConn->inUse.unlock();
}else{
std::cout << "progressin'..." << std::endl;
//unknown length
H.SetHeader("X-UID", uid);
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);

View file

@ -150,7 +150,7 @@ namespace Connector_HTTP {
Socket::Connection ss( -1);
std::string streamname;
std::string recBuffer = "";
bool handlingRequest = false;
int Quality = 0;
int Segment = -1;
@ -161,6 +161,7 @@ namespace Connector_HTTP {
conn.setBlocking(false); //do not block on conn.spool() when no data is available
while (conn.connected()){
if ( !handlingRequest){
if (conn.spool() || conn.Received().size()){
if (HTTP_R.Read(conn)){
#if DEBUG >= 5
@ -173,7 +174,7 @@ namespace Connector_HTTP {
if ( !ss.connected()){
HTTP_S.Clean();
HTTP_S.SetBody("No such stream is available on the system. Please try again.\n");
conn.SendNow(HTTP_S.BuildResponse("404", "Not found"));
HTTP_S.SendResponse("404", "Not found", conn);
continue;
}
ss.setBlocking(false);
@ -194,7 +195,7 @@ namespace Connector_HTTP {
HTTP_S.SetBody(dynamicBootstrap(streamname, Strm.getTrackById(atoll(streamID.c_str())),Strm.metadata.isMember("live")));
HTTP_S.SetHeader("Content-Type", "binary/octet");
HTTP_S.SetHeader("Cache-Control", "no-cache");
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
HTTP_S.SendResponse("200", "OK", conn);
HTTP_R.Clean(); //clean for any possible next requests
continue;
}
@ -222,7 +223,7 @@ namespace Connector_HTTP {
if (it == vidTrack["keys"].ArrEnd() - 2){
HTTP_S.Clean();
HTTP_S.SetBody("Proxy, re-request this in a second or two.\n");
conn.SendNow(HTTP_S.BuildResponse("208", "Ask again later"));
HTTP_S.SendResponse("208", "Ask again later", conn);
HTTP_R.Clean(); //clean for any possible next requests
std::cout << "Fragment after fragment " << ReqFragment << " not available yet" << std::endl;
}
@ -236,7 +237,7 @@ namespace Connector_HTTP {
if (mstime == 0 && ReqFragment > 1){
HTTP_S.Clean();
HTTP_S.SetBody("The requested fragment is no longer kept in memory on the server and cannot be served.\n");
conn.SendNow(HTTP_S.BuildResponse("412", "Fragment out of range"));
HTTP_S.SendResponse("412", "Fragment out of range", conn);
HTTP_R.Clean(); //clean for any possible next requests
std::cout << "Fragment " << ReqFragment << " too old" << std::endl;
continue;
@ -245,18 +246,16 @@ namespace Connector_HTTP {
std::stringstream sstream;
sstream << "t " << Quality << " " << audioTrack << "\ns " << mstime << "\np " << (mstime + mslen) << "\n";
ss.SendNow(sstream.str().c_str());
std::cout << sstream.str() << std::endl;
HTTP_S.Clean();
HTTP_S.protocol = "HTTP/1.1";
HTTP_S.SetHeader("Content-Type", "video/mp4");
HTTP_S.SetBody("");
std::string new_strap = dynamicBootstrap(streamname, Strm.getTrackById(Quality), Strm.metadata.isMember("live"), ReqFragment);
HTTP_S.SetHeader("Transfer-Encoding", "chunked");
HTTP_S.SendResponse("200", "OK", conn);
HTTP_S.Chunkify(new_strap, conn);
HTTP_S.StartResponse(HTTP_R, conn);
//send the bootstrap
std::string bootstrap = dynamicBootstrap(streamname, Strm.getTrackById(Quality), Strm.metadata.isMember("live"), ReqFragment);
HTTP_S.Chunkify(bootstrap, conn);
//send a zero-size mdat, meaning it stretches until end of file.
HTTP_S.Chunkify("\000\000\000\000mdat", 8, conn);
//fill buffer with init data, if needed.
//send init data, if needed.
if (audioTrack > 0 && Strm.getTrackById(audioTrack).isMember("init")){
tmp.DTSCAudioInit(Strm.getTrackById(audioTrack));
tmp.tagTime(mstime);
@ -267,18 +266,20 @@ namespace Connector_HTTP {
tmp.tagTime(mstime);
HTTP_S.Chunkify(tmp.data, tmp.len, conn);
}
handlingRequest = true;
}else{
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml");
HTTP_S.SetHeader("Cache-Control", "no-cache");
std::string manifest = dynamicIndex(streamname, Strm.metadata);
HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
HTTP_S.SetBody(dynamicIndex(streamname, Strm.metadata));
HTTP_S.SendResponse("200", "OK", conn);
}
HTTP_R.Clean(); //clean for any possible next requests
}
}else{
Util::sleep(1);
//sleep for 250ms before next attempt
Util::sleep(250);
}
}
if (ss.connected()){
unsigned int now = Util::epoch();
@ -286,13 +287,12 @@ namespace Connector_HTTP {
lastStats = now;
ss.SendNow(conn.getStats("HTTP_Dynamic").c_str());
}
if (ss.spool()){
if (handlingRequest && ss.spool()){
while (Strm.parsePacket(ss.Received())){
if (Strm.lastType() == DTSC::PAUSEMARK){
//send an empty chunk to signify request is done
std::string empty = "";
HTTP_S.Chunkify(empty, conn);
std::cout << "Finito!" << std::endl;
HTTP_S.Chunkify("", 0, conn);
handlingRequest = false;
}
if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){
//send a chunk with the new data

View file

@ -94,9 +94,6 @@ namespace Connector_HTTP {
///\param conn A socket describing the connection the client.
///\return The exit code of the connector.
int liveConnector(Socket::Connection conn){
std::stringstream TSBuf;
long long int TSBufTime = 0;
DTSC::Stream Strm; //Incoming stream buffer.
HTTP::Parser HTTP_R, HTTP_S; //HTTP Receiver en HTTP Sender.
@ -104,6 +101,7 @@ namespace Connector_HTTP {
bool AppleCompat = false; //Set to true when Apple device detected.
Socket::Connection ss( -1);
std::string streamname;
bool handlingRequest = false;
std::string recBuffer = "";
TS::Packet PackData;
@ -129,11 +127,12 @@ namespace Connector_HTTP {
conn.setBlocking(false); //do not block on conn.spool() when no data is available
while (conn.connected()){
if ( !handlingRequest){
if (conn.spool() || conn.Received().size()){
if (HTTP_R.Read(conn)){
#if DEBUG >= 5
#if DEBUG >= 5
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif
#endif
conn.setHost(HTTP_R.GetHeader("X-Origin"));
AppleCompat = (HTTP_R.GetHeader("User-Agent").find("Apple") != std::string::npos);
streamname = HTTP_R.GetHeader("X-Stream");
@ -198,6 +197,11 @@ namespace Connector_HTTP {
sstream << "s " << Segment << "\n";
sstream << "p " << frameCount << "\n";
ss.SendNow(sstream.str().c_str());
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp2t");
HTTP_S.StartResponse(HTTP_R, conn);
handlingRequest = true;
}else{
std::string request = HTTP_R.url.substr(HTTP_R.url.find("/", 5) + 1);
if (HTTP_R.url.find(".m3u8") != std::string::npos){
@ -222,7 +226,8 @@ namespace Connector_HTTP {
HTTP_R.Clean(); //clean for any possible next requests
}
}else{
Util::sleep(1);
Util::sleep(250);
}
}
if (ready4data){
unsigned int now = Util::epoch();
@ -230,23 +235,11 @@ namespace Connector_HTTP {
lastStats = now;
ss.SendNow(conn.getStats("HTTP_Live").c_str());
}
if (ss.spool()){
if (handlingRequest && ss.spool()){
while (Strm.parsePacket(ss.Received())){
if (Strm.lastType() == DTSC::PAUSEMARK){
TSBuf.flush();
if (TSBuf.str().size()){
HTTP_S.Clean();
HTTP_S.protocol = "HTTP/1.1";
HTTP_S.SetHeader("Content-Type", "video/mp2t");
HTTP_S.SetHeader("Connection", "keep-alive");
HTTP_S.SetBody("");
HTTP_S.SetHeader("Content-Length", TSBuf.str().size());
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
conn.SendNow(TSBuf.str().c_str(), TSBuf.str().size());
TSBuf.str("");
PacketNumber = 0;
}
TSBuf.str("");
HTTP_S.Chunkify("", 0, conn);
handlingRequest = false;
}
if ( !haveAvcc){
avccbox.setPayload(Strm.getTrackById(trackID)["init"].asString());
@ -257,9 +250,9 @@ namespace Connector_HTTP {
//write PAT and PMT TS packets
if (PacketNumber % 42 == 0){
PackData.DefaultPAT();
TSBuf.write(PackData.ToString(), 188);
HTTP_S.Chunkify(PackData.ToString(), 188, conn);
PackData.DefaultPMT();
TSBuf.write(PackData.ToString(), 188);
HTTP_S.Chunkify(PackData.ToString(), 188, conn);
PacketNumber += 2;
}
@ -310,7 +303,7 @@ namespace Connector_HTTP {
unsigned int toSend = PackData.AddStuffing(ToPack.bytes(184));
std::string gonnaSend = ToPack.remove(toSend);
PackData.FillFree(gonnaSend);
TSBuf.write(PackData.ToString(), 188);
HTTP_S.Chunkify(PackData.ToString(), 188, conn);
PacketNumber++;
//rest of packets
@ -321,7 +314,7 @@ namespace Connector_HTTP {
toSend = PackData.AddStuffing(ToPack.bytes(184));
gonnaSend = ToPack.remove(toSend);
PackData.FillFree(gonnaSend);
TSBuf.write(PackData.ToString(), 188);
HTTP_S.Chunkify(PackData.ToString(), 188, conn);
PacketNumber++;
}

View file

@ -54,12 +54,7 @@ namespace Connector_HTTP {
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif
conn.setHost(HTTP_R.GetHeader("X-Origin"));
//we assume the URL is the stream name with a 3 letter extension
streamname = HTTP_R.getUrl().substr(1);
size_t extDot = streamname.rfind('.');
if (extDot != std::string::npos){
streamname.resize(extDot);
}; //strip the extension
streamname = HTTP_R.GetHeader("X-Stream");
int start = 0;
if ( !HTTP_R.GetVar("start").empty()){
start = atoi(HTTP_R.GetVar("start").c_str());
@ -126,7 +121,9 @@ namespace Connector_HTTP {
byterate += Strm.getTrackById(audioID)["bps"].asInt();
}
if ( !byterate){byterate = 1;}
if (seek_byte){
seek_sec = (seek_byte / byterate) * 1000;
}
std::stringstream cmd;
cmd << "t";
if (videoID != -1){

View file

@ -52,12 +52,7 @@ namespace Connector_HTTP {
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif
conn.setHost(HTTP_R.GetHeader("X-Origin"));
//we assume the URL is the stream name with a 3 letter extension
streamname = HTTP_R.getUrl().substr(1);
size_t extDot = streamname.rfind('.');
if (extDot != std::string::npos){
streamname.resize(extDot);
}; //strip the extension
streamname = HTTP_R.GetHeader("X-Stream");
int start = 0;
if ( !HTTP_R.GetVar("start").empty()){
start = atoi(HTTP_R.GetVar("start").c_str());
@ -121,7 +116,9 @@ namespace Connector_HTTP {
byterate += Strm.getTrackById(audioID)["bps"].asInt();
}
if ( !byterate){byterate = 1;}
if (seek_byte){
seek_sec = (seek_byte / byterate) * 1000;
}
std::stringstream cmd;
cmd << "t";
if (videoID != -1){

View file

@ -60,12 +60,7 @@ namespace Connector_HTTP {
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif
conn.setHost(HTTP_R.GetHeader("X-Origin"));
//we assume the URL is the stream name with a 3 letter extension
streamname = HTTP_R.getUrl().substr(1);
size_t extDot = streamname.rfind('.');
if (extDot != std::string::npos){
streamname.resize(extDot);
}; //strip the extension
streamname = HTTP_R.GetHeader("X-Stream");
int start = 0;
if ( !HTTP_R.GetVar("start").empty()){
start = atoi(HTTP_R.GetVar("start").c_str());
@ -132,7 +127,9 @@ namespace Connector_HTTP {
byterate += Strm.getTrackById(audioID)["bps"].asInt();
}
if ( !byterate){byterate = 1;}
if (seek_byte){
seek_sec = (seek_byte / byterate) * 1000;
}
std::stringstream cmd;
cmd << "t";
if (videoID != -1){

View file

@ -60,10 +60,10 @@ namespace Connector_HTTP {
}
if (oIt->second["type"].asString() == "video"){
allVideo[oIt->first] = oIt->second;
if (oIt->second["width"].asInt() > maxWidth){ maxWidth = oIt->second["width"].asInt(); }
if (oIt->second["width"].asInt() < minWidth){ minWidth = oIt->second["width"].asInt(); }
if (oIt->second["height"].asInt() > maxHeight){ maxHeight = oIt->second["height"].asInt(); }
if (oIt->second["height"].asInt() < minHeight){ minHeight = oIt->second["height"].asInt(); }
if (oIt->second["width"].asInt() > maxWidth){maxWidth = oIt->second["width"].asInt();}
if (oIt->second["width"].asInt() < minWidth){minWidth = oIt->second["width"].asInt();}
if (oIt->second["height"].asInt() > maxHeight){maxHeight = oIt->second["height"].asInt();}
if (oIt->second["height"].asInt() < minHeight){minHeight = oIt->second["height"].asInt();}
}
}
@ -164,6 +164,7 @@ namespace Connector_HTTP {
bool ready4data = false;//Set to true when streaming is to begin.
Socket::Connection ss( -1);//The Stream Socket, used to connect to the desired stream.
std::string streamname;//Will contain the name of the stream.
bool handlingRequest = false;
bool wantsVideo = false;//Indicates whether this request is a video request.
bool wantsAudio = false;//Indicates whether this request is an audio request.
@ -178,11 +179,12 @@ namespace Connector_HTTP {
JSON::Value allVideo;
while (conn.connected()){
if ( !handlingRequest){
if (conn.spool() || conn.Received().size()){
if (HTTP_R.Read(conn)){
#if DEBUG >= 5
#if DEBUG >= 5
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif
#endif
//Get data set by the proxy.
conn.setHost(HTTP_R.GetHeader("X-Origin"));
streamname = HTTP_R.GetHeader("X-Stream");
@ -324,10 +326,6 @@ namespace Connector_HTTP {
sstream << "s " << (requestedTime / 10000) << "\np " << (mstime + mslen) <<"\n";
ss.SendNow(sstream.str().c_str());
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp4");
HTTP_S.SetBody("");
unsigned int myDuration;
//Wrap everything in mp4 boxes
@ -405,14 +403,12 @@ namespace Connector_HTTP {
traf_box.setContent(trun_box, 1);
moof_box.setContent(traf_box, 1);
//Send the complete message
HTTP_S.SetHeader("Content-Length", keyObj["size"].asInt() + 8 + moof_box.boxedSize());
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
conn.SendNow(moof_box.asBox(), moof_box.boxedSize());
unsigned long size = htonl(keyObj["size"].asInt() + 8);
conn.SendNow((char*) &size, 4);
conn.SendNow("mdat", 4);
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp4");
HTTP_S.StartResponse(HTTP_R, conn);
HTTP_S.Chunkify(moof_box.asBox(), moof_box.boxedSize(), conn);
HTTP_S.Chunkify("\000\000\000\000mdat", 8, conn);
handlingRequest = true;
}else{
//We have a request for a Manifest, generate and send it.
HTTP_S.Clean();
@ -427,8 +423,9 @@ namespace Connector_HTTP {
HTTP_R.Clean();
}
}else{
//Wait 1 second before checking for new data.
Util::sleep(1);
//Wait 250ms before checking for new data.
Util::sleep(250);
}
}
if (ready4data){
unsigned int now = Util::epoch();
@ -437,12 +434,14 @@ namespace Connector_HTTP {
lastStats = now;
ss.SendNow(conn.getStats("HTTP_Smooth").c_str());
}
if (ss.spool()){
if (handlingRequest && ss.spool()){
while (Strm.parsePacket(ss.Received())){
if (Strm.lastType() == DTSC::AUDIO || Strm.lastType() == DTSC::VIDEO){
//Select only the data that the client has requested.
int tmp = Util::getMS();
conn.SendNow(Strm.lastData());
HTTP_S.Chunkify(Strm.lastData(), conn);
}
if (Strm.lastType() == DTSC::PAUSEMARK){
HTTP_S.Chunkify("", 0, conn);
handlingRequest = false;
}
}
}