diff --git a/src/buffer.cpp b/src/buffer.cpp index a79918a7..567a1566 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -61,14 +61,19 @@ namespace Buffer{ while (usr->S.connected()){ usleep(5000); //sleep 5ms - if (usr->S.spool() && usr->S.Received().find('\n') != std::string::npos){ - std::string cmd = usr->S.Received().substr(0, usr->S.Received().find('\n')); - usr->S.Received().erase(0, usr->S.Received().find('\n')+1); - if (cmd != ""){ - switch (cmd[0]){ + usr->Send(); + if (usr->S.spool() && usr->S.Received().size()){ + //delete anything that doesn't end with a newline + if (!usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){ + usr->S.Received().get().clear(); + continue; + } + usr->S.Received().get().resize(usr->S.Received().get().size() - 1); + if (!usr->S.Received().get().empty()){ + switch (usr->S.Received().get()[0]){ case 'P':{ //Push - std::cout << "Push attempt from IP " << cmd.substr(2) << std::endl; - if (thisStream->checkWaitingIP(cmd.substr(2))){ + std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl; + if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){ if (thisStream->setInput(usr->S)){ std::cout << "Push accepted!" << std::endl; usr->S = Socket::Connection(-1); @@ -81,7 +86,7 @@ namespace Buffer{ } } break; case 'S':{ //Stats - usr->tmpStats = Stats(cmd.substr(2)); + usr->tmpStats = Stats(usr->S.Received().get().substr(2)); unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; if (secs < 1){secs = 1;} usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; @@ -107,7 +112,6 @@ namespace Buffer{ } } } - usr->Send(); } usr->Disconnect("Socket closed."); thisStream->cleanUsers(); @@ -157,7 +161,7 @@ namespace Buffer{ if (thisStream->getIPInput().connected()){ if (thisStream->getIPInput().spool()){ thisStream->getWriteLock(); - if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){ + if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received().get())){ thisStream->getStream()->outPacket(0); thisStream->dropWriteLock(true); }else{ diff --git a/src/conn_http.cpp b/src/conn_http.cpp index 8319e4df..becd7ea4 100644 --- a/src/conn_http.cpp +++ b/src/conn_http.cpp @@ -241,9 +241,9 @@ namespace Connector_HTTP{ //wait for a response while (connconn.count(uid) && connconn[uid]->conn->connected() && conn->connected()){ conn->spool(); - if (connconn[uid]->conn->spool()){ + if (connconn[uid]->conn->Received().size() || connconn[uid]->conn->spool()){ //check if the whole response was received - if (H.Read(connconn[uid]->conn->Received())){ + if (H.Read(connconn[uid]->conn->Received().get())){ break;//continue down below this while loop } }else{ @@ -280,10 +280,10 @@ namespace Connector_HTTP{ connconn[uid]->in_use.unlock(); //continue sending data from this socket and keep it permanently in use while (myConn->connected() && conn->connected()){ - if (myConn->spool()){ + if (myConn->Received().size() || myConn->spool()){ //forward any and all incoming data directly without parsing - conn->Send(myConn->Received()); - myConn->Received().clear(); + conn->Send(myConn->Received().get()); + myConn->Received().get().clear(); conn->flush(); }else{ usleep(30000); @@ -338,8 +338,8 @@ namespace Connector_HTTP{ conn->setBlocking(false);//do not block on conn.spool() when no data is available HTTP::Parser Client; while (conn->connected()){ - if (conn->spool() || !conn->Received().empty()){ - if (Client.Read(conn->Received())){ + if (conn->Received().size() || conn->spool()){ + if (Client.Read(conn->Received().get())){ std::string handler = getHTTPType(Client); long long int startms = getNowMS(); #if DEBUG >= 4 @@ -358,11 +358,6 @@ namespace Connector_HTTP{ std::cout << "Completed request (" << conn->getSocket() << ") " << handler << " in " << (getNowMS() - startms) << " ms" << std::endl; #endif Client.Clean(); //clean for any possible next requests - }else{ - #if DEBUG >= 3 - fprintf(stderr, "Could not parse the following:\n%s\n", conn->Received().c_str()); - #endif - usleep(100000);//sleep 100ms } }else{ usleep(10000);//sleep 10ms diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp index 2df92afd..ff4587e8 100644 --- a/src/conn_http_dynamic.cpp +++ b/src/conn_http_dynamic.cpp @@ -138,8 +138,8 @@ namespace Connector_HTTP{ conn.setBlocking(false);//do not block on conn.spool() when no data is available while (conn.connected()){ - if (conn.spool()){ - if (HTTP_R.Read(conn.Received())){ + if (conn.spool() || conn.Received().size()){ + if (HTTP_R.Read(conn.Received().get())){ #if DEBUG >= 4 std::cout << "Received request: " << HTTP_R.getUrl() << std::endl; #endif @@ -196,10 +196,6 @@ namespace Connector_HTTP{ } ready4data = true; HTTP_R.Clean(); //clean for any possible next requests - }else{ - #if DEBUG >= 3 - fprintf(stderr, "Could not parse the following:\n%s\n", conn.Received().c_str()); - #endif } }else{ usleep(10000);//sleep 10ms @@ -231,7 +227,7 @@ namespace Connector_HTTP{ ss.Send("S "); ss.Send(conn.getStats("HTTP_Dynamic").c_str()); } - if (ss.spool() || ss.Received() != ""){ + if (ss.spool() || ss.Received().size()){ if (Strm.parsePacket(ss.Received())){ if (Strm.getPacket(0).isMember("time")){ if (!Strm.metadata.isMember("firsttime")){ @@ -272,7 +268,11 @@ namespace Connector_HTTP{ conn.Send(HTTP_S.BuildResponse("200", "OK")); Flash_RequestPending--; #if DEBUG >= 3 - fprintf(stderr, "Sending a fragment\n"); + fprintf(stderr, "Sending a fragment..."); + #endif + conn.flush(); + #if DEBUG >= 3 + fprintf(stderr, "Done\n"); #endif } } diff --git a/src/conn_http_progressive.cpp b/src/conn_http_progressive.cpp index dad3b6d5..83227aab 100644 --- a/src/conn_http_progressive.cpp +++ b/src/conn_http_progressive.cpp @@ -40,8 +40,8 @@ namespace Connector_HTTP{ while (conn.connected()){ //only parse input if available or not yet init'ed - if (conn.spool()){ - if (HTTP_R.Read(conn.Received())){ + if (conn.spool() || conn.Received().size()){ + if (HTTP_R.Read(conn.Received().get())){ #if DEBUG >= 4 std::cout << "Received request: " << HTTP_R.getUrl() << std::endl; #endif @@ -53,10 +53,6 @@ namespace Connector_HTTP{ seek_pos = atoi(HTTP_R.GetVar("start").c_str()) * 1000;//seconds to ms ready4data = true; HTTP_R.Clean(); //clean for any possible next requests - }else{ - #if DEBUG >= 3 - fprintf(stderr, "Could not parse the following:\n%s\n", conn.Received().c_str()); - #endif } }else{ usleep(10000);//sleep 10ms @@ -94,7 +90,7 @@ namespace Connector_HTTP{ ss.Send("S "); ss.Send(conn.getStats("HTTP_Progressive").c_str()); } - if (ss.spool() || ss.Received() != ""){ + if (ss.spool() || ss.Received().size()){ if (Strm.parsePacket(ss.Received())){ tag.DTSCLoader(Strm); if (!progressive_has_sent_header){ diff --git a/src/conn_raw.cpp b/src/conn_raw.cpp index f231ab8d..f3c416a4 100644 --- a/src/conn_raw.cpp +++ b/src/conn_raw.cpp @@ -26,8 +26,10 @@ int main(int argc, char ** argv) { unsigned int started = time(0); while(std::cout.good()){ if (S.spool()){ - std::cout.write(S.Received().c_str(),S.Received().size()); - S.Received().clear(); + while (S.Received().size()){ + std::cout.write(S.Received().get().c_str(),S.Received().get().size()); + S.Received().get().clear(); + } }else{ usleep(10000);//sleep 10ms if no data } diff --git a/src/conn_rtmp.cpp b/src/conn_rtmp.cpp index 35bea93f..355a1a63 100644 --- a/src/conn_rtmp.cpp +++ b/src/conn_rtmp.cpp @@ -52,15 +52,14 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ FLV::Tag tag, init_tag; DTSC::Stream Strm; - while (Socket.Received().size() < 1537 && Socket.connected()){Socket.spool(); usleep(5000);} - RTMPStream::handshake_in = Socket.Received().substr(0, 1537); - Socket.Received().erase(0, 1537); + while (!Socket.Received().available(1537) && Socket.connected()){Socket.spool(); usleep(5000);} + RTMPStream::handshake_in = Socket.Received().remove(1537); RTMPStream::rec_cnt += 1537; if (RTMPStream::doHandshake()){ Socket.Send(RTMPStream::handshake_out); - while (Socket.Received().size() < 1536 && Socket.connected()){Socket.spool(); usleep(5000);} - Socket.Received().erase(0, 1536); + while (!Socket.Received().available(1536) && Socket.connected()){Socket.spool(); usleep(5000);} + Socket.Received().remove(1536); RTMPStream::rec_cnt += 1536; #if DEBUG >= 4 fprintf(stderr, "Handshake succcess!\n"); @@ -73,12 +72,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ } unsigned int lastStats = 0; - bool firstrun = true; while (Socket.connected()){ - if (Socket.spool() || firstrun){ - firstrun = false; - parseChunk(Socket.Received()); + if (Socket.Received().size() || Socket.spool()){ + parseChunk(Socket.Received().get()); }else{ usleep(10000);//sleep 10ms to prevent high CPU usage } @@ -108,9 +105,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ SS.Send(Socket.getStats("RTMP").c_str()); } } - if (SS.spool()){ - while (Strm.parsePacket(SS.Received())){ - + if (SS.spool() || SS.Received().size()){ + if (Strm.parsePacket(SS.Received())){ if (play_trans != -1){ //send a status reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); diff --git a/src/controller.cpp b/src/controller.cpp index fb50c629..84d3d111 100644 --- a/src/controller.cpp +++ b/src/controller.cpp @@ -383,7 +383,7 @@ int main(int argc, char ** argv){ Connector::Log("CONF", "Controller started"); conf.activate(); while (API_Socket.connected() && conf.is_active){ - usleep(100000); //sleep for 100 ms - prevents 100% CPU time + usleep(10000); //sleep for 10 ms - prevents 100% CPU time if (time(0) - processchecker > 10){ processchecker = time(0); @@ -443,9 +443,10 @@ int main(int argc, char ** argv){ break; } if (it->spool()){ - size_t newlines = it->Received().find("\n\n"); - while (newlines != std::string::npos){ - Request = JSON::fromString(it->Received().substr(0, newlines)); + while (it->Received().size()){ + it->Received().get().resize(it->Received().get().size() - 1); + Request = JSON::fromString(it->Received().get()); + it->Received().get().clear(); if (Request.isMember("buffer")){ std::string thisbuffer = Request["buffer"]; Connector::lastBuffer[thisbuffer] = time(0); @@ -488,8 +489,6 @@ int main(int argc, char ** argv){ } } } - it->Received().erase(0, newlines+2); - newlines = it->Received().find("\n\n"); } } } @@ -501,8 +500,8 @@ int main(int argc, char ** argv){ users.erase(it); break; } - if (it->C.spool()){ - if (it->H.Read(it->C.Received())){ + if (it->C.spool() || it->C.Received().size()){ + if (it->H.Read(it->C.Received().get())){ Response.null(); //make sure no data leaks from previous requests if (it->clientMode){ // In clientMode, requests are reversed. These are connections we initiated to GearBox. diff --git a/src/player.cpp b/src/player.cpp index ad8e01a7..0e0fd479 100644 --- a/src/player.cpp +++ b/src/player.cpp @@ -88,89 +88,72 @@ int main(int argc, char** argv){ Stats sts; while (in_out.connected() && std::cin.good() && std::cout.good() && (time(0) - lasttime < 60)){ - if (in_out.spool()){ - while (in_out.Received().find('\n') != std::string::npos){ - std::string cmd = in_out.Received().substr(0, in_out.Received().find('\n')); - in_out.Received().erase(0, in_out.Received().find('\n')+1); - if (cmd != ""){ - switch (cmd[0]){ - case 'P':{ //Push - #if DEBUG >= 4 - std::cerr << "Received push - ignoring (" << cmd << ")" << std::endl; - #endif - in_out.close();//pushing to VoD makes no sense - } break; - case 'S':{ //Stats - if (!StatsSocket.connected()){ - StatsSocket = Socket::Connection("/tmp/mist/statistics", true); + if (in_out.Received().size() || in_out.spool()){ + //delete anything that doesn't end with a newline + if (!in_out.Received().get().empty() && *(in_out.Received().get().rbegin()) != '\n'){ + in_out.Received().get().clear(); + continue; + } + in_out.Received().get().resize(in_out.Received().get().size() - 1); + if (!in_out.Received().get().empty()){ + switch (in_out.Received().get()[0]){ + case 'P':{ //Push + #if DEBUG >= 4 + std::cerr << "Received push - ignoring (" << in_out.Received().get() << ")" << std::endl; + #endif + in_out.close();//pushing to VoD makes no sense + } break; + case 'S':{ //Stats + if (!StatsSocket.connected()){ + StatsSocket = Socket::Connection("/tmp/mist/statistics", true); + } + if (StatsSocket.connected()){ + sts = Stats(in_out.Received().get().substr(2)); + JSON::Value json_sts; + json_sts["vod"]["down"] = (long long int)sts.down; + json_sts["vod"]["up"] = (long long int)sts.up; + json_sts["vod"]["time"] = (long long int)sts.conntime; + json_sts["vod"]["host"] = sts.host; + json_sts["vod"]["connector"] = sts.connector; + json_sts["vod"]["filename"] = conf.getString("filename"); + json_sts["vod"]["now"] = (long long int)time(0); + json_sts["vod"]["start"] = (long long int)(time(0) - sts.conntime); + if (!meta_sent){ + json_sts["vod"]["meta"] = meta; + meta_sent = true; } - if (StatsSocket.connected()){ - sts = Stats(cmd.substr(2)); - JSON::Value json_sts; - json_sts["vod"]["down"] = (long long int)sts.down; - json_sts["vod"]["up"] = (long long int)sts.up; - json_sts["vod"]["time"] = (long long int)sts.conntime; - json_sts["vod"]["host"] = sts.host; - json_sts["vod"]["connector"] = sts.connector; - json_sts["vod"]["filename"] = conf.getString("filename"); - json_sts["vod"]["now"] = (long long int)time(0); - json_sts["vod"]["start"] = (long long int)(time(0) - sts.conntime); - if (!meta_sent){ - json_sts["vod"]["meta"] = meta; - meta_sent = true; - } - StatsSocket.Send(json_sts.toString().c_str()); - StatsSocket.Send("\n\n"); - StatsSocket.flush(); - } - } break; - case 's':{ //second-seek - #if DEBUG >= 4 - std::cerr << "Received ms-seek (" << cmd << ")" << std::endl; - #endif - int ms = JSON::Value(cmd.substr(2)).asInt(); - bool ret = source.seek_time(ms); - #if DEBUG >= 4 - std::cerr << "Second-seek completed (time " << ms << "ms) " << ret << std::endl; - #endif - } break; - case 'f':{ //frame-seek - #if DEBUG >= 4 - std::cerr << "Received frame-seek (" << cmd << ")" << std::endl; - #endif - bool ret = source.seek_frame(JSON::Value(cmd.substr(2)).asInt()); - #if DEBUG >= 4 - std::cerr << "Frame-seek completed " << ret << std::endl; - #endif - } break; - case 'p':{ //play - #if DEBUG >= 4 - std::cerr << "Received play" << std::endl; - #endif - playing = -1; - in_out.setBlocking(false); - } break; - case 'o':{ //once-play - #if DEBUG >= 4 - std::cerr << "Received once-play" << std::endl; - #endif - if (playing <= 0){playing = 1;} - ++playing; - in_out.setBlocking(false); - } break; - case 'q':{ //quit-playing - #if DEBUG >= 4 - std::cerr << "Received quit-playing" << std::endl; - #endif - playing = 0; - in_out.setBlocking(true); - } break; - } + StatsSocket.Send(json_sts.toString().c_str()); + StatsSocket.Send("\n\n"); + StatsSocket.flush(); + } + } break; + case 's':{ //second-seek + int ms = JSON::Value(in_out.Received().get().substr(2)).asInt(); + bool ret = source.seek_time(ms); + } break; + case 'f':{ //frame-seek + bool ret = source.seek_frame(JSON::Value(in_out.Received().get().substr(2)).asInt()); + } break; + case 'p':{ //play + playing = -1; + in_out.setBlocking(false); + } break; + case 'o':{ //once-play + if (playing <= 0){playing = 1;} + ++playing; + in_out.setBlocking(false); + } break; + case 'q':{ //quit-playing + playing = 0; + in_out.setBlocking(true); + } break; } + in_out.Received().get().clear(); } } if (playing != 0){ now = getNowMS(); + /// \todo This makes no sense. We're timing for packets here, but sending a whole keyframe. Fix. ASAP. if (playing > 0 || now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) { source.seekNext(); lastTime = source.getJSON()["time"].asInt(); @@ -203,8 +186,9 @@ int main(int argc, char** argv){ } else { usleep(std::min(10000LL, lastTime - (now - timeDiff)) * 1000); } + }else{ + usleep(10000);//sleep 10ms } - usleep(10000);//sleep 10ms } StatsSocket.close();