Changed HTTP connectors to make use of new HTTP library capabilities.

This commit is contained in:
Thulinma 2013-08-21 12:01:26 +02:00
parent 4a8abaf644
commit a40db6126f
7 changed files with 75 additions and 179 deletions

View file

@ -354,18 +354,8 @@ namespace Connector_HTTP {
while (myCConn->conn->connected() && conn->connected()){
conn->spool();
if (myCConn->conn->Received().size() || myCConn->conn->spool()){
//make sure we end in a \n
if ( *(myCConn->conn->Received().get().rbegin()) != '\n'){
std::string tmp = myCConn->conn->Received().get();
myCConn->conn->Received().get().clear();
if (myCConn->conn->Received().size()){
myCConn->conn->Received().get().insert(0, tmp);
}else{
myCConn->conn->Received().append(tmp);
}
}
//check if the whole header was received
if (H.Read(myCConn->conn->Received().get())){
if (H.Read(*(myCConn->conn))){
//208 means the fragment is too new, retry in 3s
if (H.url == "208"){
while (myCConn->conn->Received().size() > 0){
@ -413,30 +403,18 @@ namespace Connector_HTTP {
}else{
long long int ret = Util::getMS();
//success, check type of response
if (H.GetHeader("Content-Length") != ""){
std::cout << "Response headers for " << orig_url << " received...";
if (H.GetHeader("Content-Length") != "" || H.GetHeader("Transfer-Encoding") == "chunked"){
//known length - simply re-send the request with added headers and continue
H.SetHeader("X-UID", uid);
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
H.body = "";
conn->SendNow(H.BuildResponse("200", "OK"));
unsigned int bodyLen = H.length;
while (bodyLen > 0 && conn->connected() && myCConn->conn->connected()){
if (myCConn->conn->Received().size() || myCConn->conn->spool()){
if (myCConn->conn->Received().get().size() <= bodyLen){
conn->SendNow(myCConn->conn->Received().get());
bodyLen -= myCConn->conn->Received().get().size();
myCConn->conn->Received().get().clear();
}else{
conn->SendNow(myCConn->conn->Received().get().c_str(), bodyLen);
myCConn->conn->Received().get().erase(0, bodyLen);
bodyLen = 0;
}
}else{
Util::sleep(5);
}
}
std::cout << "proxying..." << std::endl;
H.Proxy(*(myCConn->conn), *conn);
std::cout << "Proxying " << orig_url << " completed!" << std::endl;
myCConn->inUse.unlock();
}else{
std::cout << "progressin'..." << std::endl;
//unknown length
H.SetHeader("X-UID", uid);
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
@ -539,17 +517,7 @@ namespace Connector_HTTP {
HTTP::Parser Client;
while (conn->connected()){
if (conn->spool() || conn->Received().size()){
//make sure it ends in a \n
if ( *(conn->Received().get().rbegin()) != '\n'){
std::string tmp = conn->Received().get();
conn->Received().get().clear();
if (conn->Received().size()){
conn->Received().get().insert(0, tmp);
}else{
conn->Received().append(tmp);
}
}
if (Client.Read(conn->Received().get())){
if (Client.Read(*conn)){
std::string handler = proxyGetHandleType(Client);
#if DEBUG >= 4
std::cout << "Received request: " << Client.getUrl() << " (" << conn->getSocket() << ") => " << handler << " (" << Client.GetVar("stream")

View file

@ -90,9 +90,9 @@ namespace Connector_HTTP {
abst.setSegmentRunTable(asrt, 0);
abst.setFragmentRunTable(afrt, 0);
#if DEBUG >= 8
#if DEBUG >= 8
std::cout << "Sending bootstrap:" << std::endl << abst.toPrettyString(0) << std::endl;
#endif
#endif
return std::string((char*)abst.asBox(), (int)abst.boxedSize());
}
@ -133,9 +133,9 @@ namespace Connector_HTTP {
Result << " </media>" << std::endl;
}
Result << "</manifest>" << std::endl;
#if DEBUG >= 8
#if DEBUG >= 8
std::cerr << "Sending this manifest:" << std::endl << Result.str() << std::endl;
#endif
#endif
return Result.str();
} //BuildManifest
@ -143,9 +143,6 @@ namespace Connector_HTTP {
///\param conn A socket describing the connection the client.
///\return The exit code of the connector.
int dynamicConnector(Socket::Connection conn){
std::deque<std::string> FlashBuf;
int FlashBufSize = 0;
long long int FlashBufTime = 0;
FLV::Tag tmp; //temporary tag
DTSC::Stream Strm; //Incoming stream buffer.
@ -165,20 +162,10 @@ namespace Connector_HTTP {
while (conn.connected()){
if (conn.spool() || conn.Received().size()){
//make sure it ends in a \n
if ( *(conn.Received().get().rbegin()) != '\n'){
std::string tmp = conn.Received().get();
conn.Received().get().clear();
if (conn.Received().size()){
conn.Received().get().insert(0, tmp);
}else{
conn.Received().append(tmp);
}
}
if (HTTP_R.Read(conn.Received().get())){
#if DEBUG >= 5
if (HTTP_R.Read(conn)){
#if DEBUG >= 5
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif
#endif
conn.setHost(HTTP_R.GetHeader("X-Origin"));
streamname = HTTP_R.GetHeader("X-Stream");
if ( !ss){
@ -219,9 +206,9 @@ namespace Connector_HTTP {
Segment = atoi(HTTP_R.url.substr(temp, HTTP_R.url.find("-", temp) - temp).c_str());
temp = HTTP_R.url.find("Frag") + 4;
ReqFragment = atoi(HTTP_R.url.substr(temp).c_str());
#if DEBUG >= 5
#if DEBUG >= 5
printf("Video track %d, segment %d, fragment %d\n", Quality, Segment, ReqFragment);
#endif
#endif
if (!audioTrack){getTracks(Strm.metadata);}
JSON::Value & vidTrack = Strm.getTrackById(Quality);
mstime = 0;
@ -258,6 +245,28 @@ namespace Connector_HTTP {
std::stringstream sstream;
sstream << "t " << Quality << " " << audioTrack << "\ns " << mstime << "\np " << (mstime + mslen) << "\n";
ss.SendNow(sstream.str().c_str());
std::cout << sstream.str() << std::endl;
HTTP_S.Clean();
HTTP_S.protocol = "HTTP/1.1";
HTTP_S.SetHeader("Content-Type", "video/mp4");
HTTP_S.SetBody("");
std::string new_strap = dynamicBootstrap(streamname, Strm.getTrackById(Quality), Strm.metadata.isMember("live"), ReqFragment);
HTTP_S.SetHeader("Transfer-Encoding", "chunked");
HTTP_S.SendResponse("200", "OK", conn);
HTTP_S.Chunkify(new_strap, conn);
HTTP_S.Chunkify("\000\000\000\000mdat", 8, conn);
//fill buffer with init data, if needed.
if (audioTrack > 0 && Strm.getTrackById(audioTrack).isMember("init")){
tmp.DTSCAudioInit(Strm.getTrackById(audioTrack));
tmp.tagTime(mstime);
HTTP_S.Chunkify(tmp.data, tmp.len, conn);
}
if (Quality > 0 && Strm.getTrackById(Quality).isMember("init")){
tmp.DTSCVideoInit(Strm.getTrackById(Quality));
tmp.tagTime(mstime);
HTTP_S.Chunkify(tmp.data, tmp.len, conn);
}
}else{
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "text/xml");
@ -280,45 +289,15 @@ namespace Connector_HTTP {
if (ss.spool()){
while (Strm.parsePacket(ss.Received())){
if (Strm.lastType() == DTSC::PAUSEMARK){
if (FlashBufSize){
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/mp4");
HTTP_S.SetBody("");
std::string new_strap = dynamicBootstrap(streamname, Strm.getTrackById(Quality), Strm.metadata.isMember("live"), ReqFragment);
HTTP_S.SetHeader("Content-Length", FlashBufSize + 8 + new_strap.size()); //32+33+btstrp.size());
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
conn.SendNow(new_strap);
unsigned long size = htonl(FlashBufSize+8);
conn.SendNow((char*) &size, 4);
conn.SendNow("mdat", 4);
while (FlashBuf.size() > 0){
conn.SendNow(FlashBuf.front());
FlashBuf.pop_front();
}
}
FlashBuf.clear();
FlashBufSize = 0;
//send an empty chunk to signify request is done
std::string empty = "";
HTTP_S.Chunkify(empty, conn);
std::cout << "Finito!" << std::endl;
}
if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){
if (FlashBufSize == 0){
//fill buffer with init data, if needed.
if (audioTrack > 0 && Strm.getTrackById(audioTrack).isMember("init")){
tmp.DTSCAudioInit(Strm.getTrackById(audioTrack));
tmp.tagTime(mstime);
FlashBuf.push_back(std::string(tmp.data, tmp.len));
FlashBufSize += tmp.len;
}
if (Quality > 0 && Strm.getTrackById(Quality).isMember("init")){
tmp.DTSCVideoInit(Strm.getTrackById(Quality));
tmp.tagTime(mstime);
FlashBuf.push_back(std::string(tmp.data, tmp.len));
FlashBufSize += tmp.len;
}
FlashBufTime = mstime;
}
//send a chunk with the new data
tmp.DTSCLoader(Strm);
FlashBuf.push_back(std::string(tmp.data, tmp.len));
FlashBufSize += tmp.len;
HTTP_S.Chunkify(tmp.data, tmp.len, conn);
}
}
}
@ -367,9 +346,9 @@ int main(int argc, char ** argv){
if (myid == 0){ //if new child, start MAINHANDLER
return Connector_HTTP::dynamicConnector(S);
}else{ //otherwise, do nothing or output debugging text
#if DEBUG >= 3
#if DEBUG >= 3
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());
#endif
#endif
}
}
} //while connected

View file

@ -130,17 +130,7 @@ namespace Connector_HTTP {
while (conn.connected()){
if (conn.spool() || conn.Received().size()){
//make sure it ends in a \n
if ( *(conn.Received().get().rbegin()) != '\n'){
std::string tmp = conn.Received().get();
conn.Received().get().clear();
if (conn.Received().size()){
conn.Received().get().insert(0, tmp);
}else{
conn.Received().append(tmp);
}
}
if (HTTP_R.Read(conn.Received().get())){
if (HTTP_R.Read(conn)){
#if DEBUG >= 5
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif

View file

@ -49,17 +49,7 @@ namespace Connector_HTTP {
//Only attempt to parse input when not yet init'ed.
if ( !inited){
if (conn.Received().size() || conn.spool()){
//make sure it ends in a \n
if ( *(conn.Received().get().rbegin()) != '\n'){
std::string tmp = conn.Received().get();
conn.Received().get().clear();
if (conn.Received().size()){
conn.Received().get().insert(0, tmp);
}else{
conn.Received().append(tmp);
}
}
if (HTTP_R.Read(conn.Received().get())){
if (HTTP_R.Read(conn)){
#if DEBUG >= 5
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif

View file

@ -47,17 +47,7 @@ namespace Connector_HTTP {
//Only attempt to parse input when not yet init'ed.
if ( !inited){
if (conn.Received().size() || conn.spool()){
//make sure it ends in a \n
if ( *(conn.Received().get().rbegin()) != '\n'){
std::string tmp = conn.Received().get();
conn.Received().get().clear();
if (conn.Received().size()){
conn.Received().get().insert(0, tmp);
}else{
conn.Received().append(tmp);
}
}
if (HTTP_R.Read(conn.Received().get())){
if (HTTP_R.Read(conn)){
#if DEBUG >= 5
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif

View file

@ -55,17 +55,7 @@ namespace Connector_HTTP {
//Only attempt to parse input when not yet init'ed.
if ( !inited){
if (conn.Received().size() || conn.spool()){
//make sure it ends in a \n
if ( *(conn.Received().get().rbegin()) != '\n'){
std::string tmp = conn.Received().get();
conn.Received().get().clear();
if (conn.Received().size()){
conn.Received().get().insert(0, tmp);
}else{
conn.Received().append(tmp);
}
}
if (HTTP_R.Read(conn.Received().get())){
if (HTTP_R.Read(conn)){
#if DEBUG >= 5
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif

View file

@ -179,17 +179,7 @@ namespace Connector_HTTP {
while (conn.connected()){
if (conn.spool() || conn.Received().size()){
//Make sure the received data ends in a newline (\n).
if ( *(conn.Received().get().rbegin()) != '\n'){
std::string tmp = conn.Received().get();
conn.Received().get().clear();
if (conn.Received().size()){
conn.Received().get().insert(0, tmp);
}else{
conn.Received().append(tmp);
}
}
if (HTTP_R.Read(conn.Received().get())){
if (HTTP_R.Read(conn)){
#if DEBUG >= 5
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif
@ -332,7 +322,6 @@ namespace Connector_HTTP {
sstream << "t " << myRef["trackid"].asInt() << "\n";
sstream << "s " << (requestedTime / 10000) << "\np " << (mstime + mslen) <<"\n";
std::cout << "Sending: " << sstream.str() << std::endl;
ss.SendNow(sstream.str().c_str());
HTTP_S.Clean();