Various optimalisations to improve performance - matches/requires recent edits to libmist. More coming soon.

This commit is contained in:
Thulinma 2012-09-16 01:18:56 +02:00
parent b52d182f07
commit 8ba5823e00
8 changed files with 113 additions and 137 deletions

View file

@ -61,14 +61,19 @@ namespace Buffer{
while (usr->S.connected()){ while (usr->S.connected()){
usleep(5000); //sleep 5ms usleep(5000); //sleep 5ms
if (usr->S.spool() && usr->S.Received().find('\n') != std::string::npos){ usr->Send();
std::string cmd = usr->S.Received().substr(0, usr->S.Received().find('\n')); if (usr->S.spool() && usr->S.Received().size()){
usr->S.Received().erase(0, usr->S.Received().find('\n')+1); //delete anything that doesn't end with a newline
if (cmd != ""){ if (!usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){
switch (cmd[0]){ usr->S.Received().get().clear();
continue;
}
usr->S.Received().get().resize(usr->S.Received().get().size() - 1);
if (!usr->S.Received().get().empty()){
switch (usr->S.Received().get()[0]){
case 'P':{ //Push case 'P':{ //Push
std::cout << "Push attempt from IP " << cmd.substr(2) << std::endl; std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl;
if (thisStream->checkWaitingIP(cmd.substr(2))){ if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){
if (thisStream->setInput(usr->S)){ if (thisStream->setInput(usr->S)){
std::cout << "Push accepted!" << std::endl; std::cout << "Push accepted!" << std::endl;
usr->S = Socket::Connection(-1); usr->S = Socket::Connection(-1);
@ -81,7 +86,7 @@ namespace Buffer{
} }
} break; } break;
case 'S':{ //Stats case 'S':{ //Stats
usr->tmpStats = Stats(cmd.substr(2)); usr->tmpStats = Stats(usr->S.Received().get().substr(2));
unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime;
if (secs < 1){secs = 1;} if (secs < 1){secs = 1;}
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs;
@ -107,7 +112,6 @@ namespace Buffer{
} }
} }
} }
usr->Send();
} }
usr->Disconnect("Socket closed."); usr->Disconnect("Socket closed.");
thisStream->cleanUsers(); thisStream->cleanUsers();
@ -157,7 +161,7 @@ namespace Buffer{
if (thisStream->getIPInput().connected()){ if (thisStream->getIPInput().connected()){
if (thisStream->getIPInput().spool()){ if (thisStream->getIPInput().spool()){
thisStream->getWriteLock(); thisStream->getWriteLock();
if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){ if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received().get())){
thisStream->getStream()->outPacket(0); thisStream->getStream()->outPacket(0);
thisStream->dropWriteLock(true); thisStream->dropWriteLock(true);
}else{ }else{

View file

@ -241,9 +241,9 @@ namespace Connector_HTTP{
//wait for a response //wait for a response
while (connconn.count(uid) && connconn[uid]->conn->connected() && conn->connected()){ while (connconn.count(uid) && connconn[uid]->conn->connected() && conn->connected()){
conn->spool(); conn->spool();
if (connconn[uid]->conn->spool()){ if (connconn[uid]->conn->Received().size() || connconn[uid]->conn->spool()){
//check if the whole response was received //check if the whole response was received
if (H.Read(connconn[uid]->conn->Received())){ if (H.Read(connconn[uid]->conn->Received().get())){
break;//continue down below this while loop break;//continue down below this while loop
} }
}else{ }else{
@ -280,10 +280,10 @@ namespace Connector_HTTP{
connconn[uid]->in_use.unlock(); connconn[uid]->in_use.unlock();
//continue sending data from this socket and keep it permanently in use //continue sending data from this socket and keep it permanently in use
while (myConn->connected() && conn->connected()){ while (myConn->connected() && conn->connected()){
if (myConn->spool()){ if (myConn->Received().size() || myConn->spool()){
//forward any and all incoming data directly without parsing //forward any and all incoming data directly without parsing
conn->Send(myConn->Received()); conn->Send(myConn->Received().get());
myConn->Received().clear(); myConn->Received().get().clear();
conn->flush(); conn->flush();
}else{ }else{
usleep(30000); usleep(30000);
@ -338,8 +338,8 @@ namespace Connector_HTTP{
conn->setBlocking(false);//do not block on conn.spool() when no data is available conn->setBlocking(false);//do not block on conn.spool() when no data is available
HTTP::Parser Client; HTTP::Parser Client;
while (conn->connected()){ while (conn->connected()){
if (conn->spool() || !conn->Received().empty()){ if (conn->Received().size() || conn->spool()){
if (Client.Read(conn->Received())){ if (Client.Read(conn->Received().get())){
std::string handler = getHTTPType(Client); std::string handler = getHTTPType(Client);
long long int startms = getNowMS(); long long int startms = getNowMS();
#if DEBUG >= 4 #if DEBUG >= 4
@ -358,11 +358,6 @@ namespace Connector_HTTP{
std::cout << "Completed request (" << conn->getSocket() << ") " << handler << " in " << (getNowMS() - startms) << " ms" << std::endl; std::cout << "Completed request (" << conn->getSocket() << ") " << handler << " in " << (getNowMS() - startms) << " ms" << std::endl;
#endif #endif
Client.Clean(); //clean for any possible next requests Client.Clean(); //clean for any possible next requests
}else{
#if DEBUG >= 3
fprintf(stderr, "Could not parse the following:\n%s\n", conn->Received().c_str());
#endif
usleep(100000);//sleep 100ms
} }
}else{ }else{
usleep(10000);//sleep 10ms usleep(10000);//sleep 10ms

View file

@ -138,8 +138,8 @@ namespace Connector_HTTP{
conn.setBlocking(false);//do not block on conn.spool() when no data is available conn.setBlocking(false);//do not block on conn.spool() when no data is available
while (conn.connected()){ while (conn.connected()){
if (conn.spool()){ if (conn.spool() || conn.Received().size()){
if (HTTP_R.Read(conn.Received())){ if (HTTP_R.Read(conn.Received().get())){
#if DEBUG >= 4 #if DEBUG >= 4
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl; std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif #endif
@ -196,10 +196,6 @@ namespace Connector_HTTP{
} }
ready4data = true; ready4data = true;
HTTP_R.Clean(); //clean for any possible next requests HTTP_R.Clean(); //clean for any possible next requests
}else{
#if DEBUG >= 3
fprintf(stderr, "Could not parse the following:\n%s\n", conn.Received().c_str());
#endif
} }
}else{ }else{
usleep(10000);//sleep 10ms usleep(10000);//sleep 10ms
@ -231,7 +227,7 @@ namespace Connector_HTTP{
ss.Send("S "); ss.Send("S ");
ss.Send(conn.getStats("HTTP_Dynamic").c_str()); ss.Send(conn.getStats("HTTP_Dynamic").c_str());
} }
if (ss.spool() || ss.Received() != ""){ if (ss.spool() || ss.Received().size()){
if (Strm.parsePacket(ss.Received())){ if (Strm.parsePacket(ss.Received())){
if (Strm.getPacket(0).isMember("time")){ if (Strm.getPacket(0).isMember("time")){
if (!Strm.metadata.isMember("firsttime")){ if (!Strm.metadata.isMember("firsttime")){
@ -272,7 +268,11 @@ namespace Connector_HTTP{
conn.Send(HTTP_S.BuildResponse("200", "OK")); conn.Send(HTTP_S.BuildResponse("200", "OK"));
Flash_RequestPending--; Flash_RequestPending--;
#if DEBUG >= 3 #if DEBUG >= 3
fprintf(stderr, "Sending a fragment\n"); fprintf(stderr, "Sending a fragment...");
#endif
conn.flush();
#if DEBUG >= 3
fprintf(stderr, "Done\n");
#endif #endif
} }
} }

View file

@ -40,8 +40,8 @@ namespace Connector_HTTP{
while (conn.connected()){ while (conn.connected()){
//only parse input if available or not yet init'ed //only parse input if available or not yet init'ed
if (conn.spool()){ if (conn.spool() || conn.Received().size()){
if (HTTP_R.Read(conn.Received())){ if (HTTP_R.Read(conn.Received().get())){
#if DEBUG >= 4 #if DEBUG >= 4
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl; std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif #endif
@ -53,10 +53,6 @@ namespace Connector_HTTP{
seek_pos = atoi(HTTP_R.GetVar("start").c_str()) * 1000;//seconds to ms seek_pos = atoi(HTTP_R.GetVar("start").c_str()) * 1000;//seconds to ms
ready4data = true; ready4data = true;
HTTP_R.Clean(); //clean for any possible next requests HTTP_R.Clean(); //clean for any possible next requests
}else{
#if DEBUG >= 3
fprintf(stderr, "Could not parse the following:\n%s\n", conn.Received().c_str());
#endif
} }
}else{ }else{
usleep(10000);//sleep 10ms usleep(10000);//sleep 10ms
@ -94,7 +90,7 @@ namespace Connector_HTTP{
ss.Send("S "); ss.Send("S ");
ss.Send(conn.getStats("HTTP_Progressive").c_str()); ss.Send(conn.getStats("HTTP_Progressive").c_str());
} }
if (ss.spool() || ss.Received() != ""){ if (ss.spool() || ss.Received().size()){
if (Strm.parsePacket(ss.Received())){ if (Strm.parsePacket(ss.Received())){
tag.DTSCLoader(Strm); tag.DTSCLoader(Strm);
if (!progressive_has_sent_header){ if (!progressive_has_sent_header){

View file

@ -26,8 +26,10 @@ int main(int argc, char ** argv) {
unsigned int started = time(0); unsigned int started = time(0);
while(std::cout.good()){ while(std::cout.good()){
if (S.spool()){ if (S.spool()){
std::cout.write(S.Received().c_str(),S.Received().size()); while (S.Received().size()){
S.Received().clear(); std::cout.write(S.Received().get().c_str(),S.Received().get().size());
S.Received().get().clear();
}
}else{ }else{
usleep(10000);//sleep 10ms if no data usleep(10000);//sleep 10ms if no data
} }

View file

@ -52,15 +52,14 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
FLV::Tag tag, init_tag; FLV::Tag tag, init_tag;
DTSC::Stream Strm; DTSC::Stream Strm;
while (Socket.Received().size() < 1537 && Socket.connected()){Socket.spool(); usleep(5000);} while (!Socket.Received().available(1537) && Socket.connected()){Socket.spool(); usleep(5000);}
RTMPStream::handshake_in = Socket.Received().substr(0, 1537); RTMPStream::handshake_in = Socket.Received().remove(1537);
Socket.Received().erase(0, 1537);
RTMPStream::rec_cnt += 1537; RTMPStream::rec_cnt += 1537;
if (RTMPStream::doHandshake()){ if (RTMPStream::doHandshake()){
Socket.Send(RTMPStream::handshake_out); Socket.Send(RTMPStream::handshake_out);
while (Socket.Received().size() < 1536 && Socket.connected()){Socket.spool(); usleep(5000);} while (!Socket.Received().available(1536) && Socket.connected()){Socket.spool(); usleep(5000);}
Socket.Received().erase(0, 1536); Socket.Received().remove(1536);
RTMPStream::rec_cnt += 1536; RTMPStream::rec_cnt += 1536;
#if DEBUG >= 4 #if DEBUG >= 4
fprintf(stderr, "Handshake succcess!\n"); fprintf(stderr, "Handshake succcess!\n");
@ -73,12 +72,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
} }
unsigned int lastStats = 0; unsigned int lastStats = 0;
bool firstrun = true;
while (Socket.connected()){ while (Socket.connected()){
if (Socket.spool() || firstrun){ if (Socket.Received().size() || Socket.spool()){
firstrun = false; parseChunk(Socket.Received().get());
parseChunk(Socket.Received());
}else{ }else{
usleep(10000);//sleep 10ms to prevent high CPU usage usleep(10000);//sleep 10ms to prevent high CPU usage
} }
@ -108,9 +105,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
SS.Send(Socket.getStats("RTMP").c_str()); SS.Send(Socket.getStats("RTMP").c_str());
} }
} }
if (SS.spool()){ if (SS.spool() || SS.Received().size()){
while (Strm.parsePacket(SS.Received())){ if (Strm.parsePacket(SS.Received())){
if (play_trans != -1){ if (play_trans != -1){
//send a status reply //send a status reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);

View file

@ -383,7 +383,7 @@ int main(int argc, char ** argv){
Connector::Log("CONF", "Controller started"); Connector::Log("CONF", "Controller started");
conf.activate(); conf.activate();
while (API_Socket.connected() && conf.is_active){ while (API_Socket.connected() && conf.is_active){
usleep(100000); //sleep for 100 ms - prevents 100% CPU time usleep(10000); //sleep for 10 ms - prevents 100% CPU time
if (time(0) - processchecker > 10){ if (time(0) - processchecker > 10){
processchecker = time(0); processchecker = time(0);
@ -443,9 +443,10 @@ int main(int argc, char ** argv){
break; break;
} }
if (it->spool()){ if (it->spool()){
size_t newlines = it->Received().find("\n\n"); while (it->Received().size()){
while (newlines != std::string::npos){ it->Received().get().resize(it->Received().get().size() - 1);
Request = JSON::fromString(it->Received().substr(0, newlines)); Request = JSON::fromString(it->Received().get());
it->Received().get().clear();
if (Request.isMember("buffer")){ if (Request.isMember("buffer")){
std::string thisbuffer = Request["buffer"]; std::string thisbuffer = Request["buffer"];
Connector::lastBuffer[thisbuffer] = time(0); Connector::lastBuffer[thisbuffer] = time(0);
@ -488,8 +489,6 @@ int main(int argc, char ** argv){
} }
} }
} }
it->Received().erase(0, newlines+2);
newlines = it->Received().find("\n\n");
} }
} }
} }
@ -501,8 +500,8 @@ int main(int argc, char ** argv){
users.erase(it); users.erase(it);
break; break;
} }
if (it->C.spool()){ if (it->C.spool() || it->C.Received().size()){
if (it->H.Read(it->C.Received())){ if (it->H.Read(it->C.Received().get())){
Response.null(); //make sure no data leaks from previous requests Response.null(); //make sure no data leaks from previous requests
if (it->clientMode){ if (it->clientMode){
// In clientMode, requests are reversed. These are connections we initiated to GearBox. // In clientMode, requests are reversed. These are connections we initiated to GearBox.

View file

@ -88,89 +88,72 @@ int main(int argc, char** argv){
Stats sts; Stats sts;
while (in_out.connected() && std::cin.good() && std::cout.good() && (time(0) - lasttime < 60)){ while (in_out.connected() && std::cin.good() && std::cout.good() && (time(0) - lasttime < 60)){
if (in_out.spool()){ if (in_out.Received().size() || in_out.spool()){
while (in_out.Received().find('\n') != std::string::npos){ //delete anything that doesn't end with a newline
std::string cmd = in_out.Received().substr(0, in_out.Received().find('\n')); if (!in_out.Received().get().empty() && *(in_out.Received().get().rbegin()) != '\n'){
in_out.Received().erase(0, in_out.Received().find('\n')+1); in_out.Received().get().clear();
if (cmd != ""){ continue;
switch (cmd[0]){ }
case 'P':{ //Push in_out.Received().get().resize(in_out.Received().get().size() - 1);
#if DEBUG >= 4 if (!in_out.Received().get().empty()){
std::cerr << "Received push - ignoring (" << cmd << ")" << std::endl; switch (in_out.Received().get()[0]){
#endif case 'P':{ //Push
in_out.close();//pushing to VoD makes no sense #if DEBUG >= 4
} break; std::cerr << "Received push - ignoring (" << in_out.Received().get() << ")" << std::endl;
case 'S':{ //Stats #endif
if (!StatsSocket.connected()){ in_out.close();//pushing to VoD makes no sense
StatsSocket = Socket::Connection("/tmp/mist/statistics", true); } break;
case 'S':{ //Stats
if (!StatsSocket.connected()){
StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
}
if (StatsSocket.connected()){
sts = Stats(in_out.Received().get().substr(2));
JSON::Value json_sts;
json_sts["vod"]["down"] = (long long int)sts.down;
json_sts["vod"]["up"] = (long long int)sts.up;
json_sts["vod"]["time"] = (long long int)sts.conntime;
json_sts["vod"]["host"] = sts.host;
json_sts["vod"]["connector"] = sts.connector;
json_sts["vod"]["filename"] = conf.getString("filename");
json_sts["vod"]["now"] = (long long int)time(0);
json_sts["vod"]["start"] = (long long int)(time(0) - sts.conntime);
if (!meta_sent){
json_sts["vod"]["meta"] = meta;
meta_sent = true;
} }
if (StatsSocket.connected()){ StatsSocket.Send(json_sts.toString().c_str());
sts = Stats(cmd.substr(2)); StatsSocket.Send("\n\n");
JSON::Value json_sts; StatsSocket.flush();
json_sts["vod"]["down"] = (long long int)sts.down; }
json_sts["vod"]["up"] = (long long int)sts.up; } break;
json_sts["vod"]["time"] = (long long int)sts.conntime; case 's':{ //second-seek
json_sts["vod"]["host"] = sts.host; int ms = JSON::Value(in_out.Received().get().substr(2)).asInt();
json_sts["vod"]["connector"] = sts.connector; bool ret = source.seek_time(ms);
json_sts["vod"]["filename"] = conf.getString("filename"); } break;
json_sts["vod"]["now"] = (long long int)time(0); case 'f':{ //frame-seek
json_sts["vod"]["start"] = (long long int)(time(0) - sts.conntime); bool ret = source.seek_frame(JSON::Value(in_out.Received().get().substr(2)).asInt());
if (!meta_sent){ } break;
json_sts["vod"]["meta"] = meta; case 'p':{ //play
meta_sent = true; playing = -1;
} in_out.setBlocking(false);
StatsSocket.Send(json_sts.toString().c_str()); } break;
StatsSocket.Send("\n\n"); case 'o':{ //once-play
StatsSocket.flush(); if (playing <= 0){playing = 1;}
} ++playing;
} break; in_out.setBlocking(false);
case 's':{ //second-seek } break;
#if DEBUG >= 4 case 'q':{ //quit-playing
std::cerr << "Received ms-seek (" << cmd << ")" << std::endl; playing = 0;
#endif in_out.setBlocking(true);
int ms = JSON::Value(cmd.substr(2)).asInt(); } break;
bool ret = source.seek_time(ms);
#if DEBUG >= 4
std::cerr << "Second-seek completed (time " << ms << "ms) " << ret << std::endl;
#endif
} break;
case 'f':{ //frame-seek
#if DEBUG >= 4
std::cerr << "Received frame-seek (" << cmd << ")" << std::endl;
#endif
bool ret = source.seek_frame(JSON::Value(cmd.substr(2)).asInt());
#if DEBUG >= 4
std::cerr << "Frame-seek completed " << ret << std::endl;
#endif
} break;
case 'p':{ //play
#if DEBUG >= 4
std::cerr << "Received play" << std::endl;
#endif
playing = -1;
in_out.setBlocking(false);
} break;
case 'o':{ //once-play
#if DEBUG >= 4
std::cerr << "Received once-play" << std::endl;
#endif
if (playing <= 0){playing = 1;}
++playing;
in_out.setBlocking(false);
} break;
case 'q':{ //quit-playing
#if DEBUG >= 4
std::cerr << "Received quit-playing" << std::endl;
#endif
playing = 0;
in_out.setBlocking(true);
} break;
}
} }
in_out.Received().get().clear();
} }
} }
if (playing != 0){ if (playing != 0){
now = getNowMS(); now = getNowMS();
/// \todo This makes no sense. We're timing for packets here, but sending a whole keyframe. Fix. ASAP.
if (playing > 0 || now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) { if (playing > 0 || now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) {
source.seekNext(); source.seekNext();
lastTime = source.getJSON()["time"].asInt(); lastTime = source.getJSON()["time"].asInt();
@ -203,8 +186,9 @@ int main(int argc, char** argv){
} else { } else {
usleep(std::min(10000LL, lastTime - (now - timeDiff)) * 1000); usleep(std::min(10000LL, lastTime - (now - timeDiff)) * 1000);
} }
}else{
usleep(10000);//sleep 10ms
} }
usleep(10000);//sleep 10ms
} }
StatsSocket.close(); StatsSocket.close();