From ad5718acc62eaad2bd6c0a9e891a876f5752fa30 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 18 Sep 2012 15:48:05 +0200 Subject: [PATCH] Several playback speed fixes and other minor improvements. --- src/buffer_stream.cpp | 7 +-- src/conn_http.cpp | 14 ++---- src/conn_http_dynamic.cpp | 6 +-- src/conn_http_progressive.cpp | 52 +++++++++++----------- src/conn_raw.cpp | 13 +++--- src/conn_rtmp.cpp | 84 ++++++++++++++++++++++++++++------- src/controller.cpp | 26 +++++------ src/player.cpp | 37 +++++++-------- 8 files changed, 137 insertions(+), 102 deletions(-) diff --git a/src/buffer_stream.cpp b/src/buffer_stream.cpp index 1956a28b..cafacf86 100644 --- a/src/buffer_stream.cpp +++ b/src/buffer_stream.cpp @@ -2,6 +2,7 @@ /// Contains definitions for buffer streams. #include "buffer_stream.h" +#include /// Stores the globally equal reference. Buffer::Stream * Buffer::Stream::ref = 0; @@ -47,7 +48,7 @@ Buffer::Stream::~Stream(){ /// Calculate and return the current statistics in JSON format. std::string & Buffer::Stream::getStats(){ static std::string ret; - unsigned int now = time(0); + long long int now = Util::epoch(); unsigned int tot_up = 0, tot_down = 0, tot_count = 0; stats_mutex.lock(); if (users.size() > 0){ @@ -125,7 +126,7 @@ void Buffer::Stream::saveStats(std::string username, Stats & stats){ Storage["curr"][username]["down"] = stats.down; Storage["curr"][username]["conntime"] = stats.conntime; Storage["curr"][username]["host"] = stats.host; - Storage["curr"][username]["start"] = (unsigned int) time(0) - stats.conntime; + Storage["curr"][username]["start"] = Util::epoch() - stats.conntime; stats_mutex.unlock(); } @@ -143,7 +144,7 @@ void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string Storage["log"][username]["down"] = stats.down; Storage["log"][username]["conntime"] = stats.conntime; Storage["log"][username]["host"] = stats.host; - Storage["log"][username]["start"] = (unsigned int)time(0) - stats.conntime; + Storage["log"][username]["start"] = Util::epoch() - stats.conntime; stats_mutex.unlock(); cleanUsers(); } diff --git a/src/conn_http.cpp b/src/conn_http.cpp index a83deb7a..80cdc53a 100644 --- a/src/conn_http.cpp +++ b/src/conn_http.cpp @@ -10,8 +10,6 @@ #include #include #include -#include -#include //for gettimeofday #include #include #include @@ -19,6 +17,7 @@ #include #include #include +#include #include "tinythread.h" #include "embed.js.h" @@ -324,13 +323,6 @@ namespace Connector_HTTP{ return "none"; } - /// Gets the current system time in milliseconds. - long long int getNowMS(){ - timeval t; - gettimeofday(&t, 0); - return t.tv_sec * 1000 + t.tv_usec/1000; - }//getNowMS - /// Thread for handling a single HTTP connection void Handle_HTTP_Connection(void * pointer){ Socket::Connection * conn = (Socket::Connection *)pointer; @@ -340,7 +332,7 @@ namespace Connector_HTTP{ if (conn->Received().size() || conn->spool()){ if (Client.Read(conn->Received().get())){ std::string handler = getHTTPType(Client); - long long int startms = getNowMS(); + long long int startms = Util::getMS(); #if DEBUG >= 4 std::cout << "Received request: " << Client.getUrl() << " (" << conn->getSocket() << ") => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl; #endif @@ -354,7 +346,7 @@ namespace Connector_HTTP{ Handle_Through_Connector(Client, conn, handler); } #if DEBUG >= 4 - std::cout << "Completed request (" << conn->getSocket() << ") " << handler << " in " << (getNowMS() - startms) << " ms" << std::endl; + std::cout << "Completed request (" << conn->getSocket() << ") " << handler << " in " << (Util::getMS() - startms) << " ms" << std::endl; #endif Client.Clean(); //clean for any possible next requests } diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp index 6c8af80e..d711a7a7 100644 --- a/src/conn_http_dynamic.cpp +++ b/src/conn_http_dynamic.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -22,6 +21,7 @@ #include #include #include +#include /// Holds everything unique to HTTP Dynamic Connector. namespace Connector_HTTP{ @@ -234,10 +234,9 @@ namespace Connector_HTTP{ #endif inited = true; } - unsigned int now = time(0); + unsigned int now = Util::epoch(); if (now != lastStats){ lastStats = now; - ss.Send("S "); ss.SendNow(conn.getStats("HTTP_Dynamic").c_str()); } if (ss.spool()){ @@ -347,7 +346,6 @@ namespace Connector_HTTP{ } } conn.close(); - ss.Send("S "); ss.SendNow(conn.getStats("HTTP_Dynamic").c_str()); ss.close(); #if DEBUG >= 1 diff --git a/src/conn_http_progressive.cpp b/src/conn_http_progressive.cpp index ab4adbd8..48484902 100644 --- a/src/conn_http_progressive.cpp +++ b/src/conn_http_progressive.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -19,6 +18,7 @@ #include #include #include +#include /// Holds everything unique to HTTP Progressive Connector. namespace Connector_HTTP{ @@ -36,26 +36,25 @@ namespace Connector_HTTP{ unsigned int lastStats = 0; unsigned int seek_pos = 0;//seek position in ms - conn.setBlocking(false);//do not block on conn.spool() when no data is available while (conn.connected()){ //only parse input if available or not yet init'ed - if (conn.spool() || conn.Received().size()){ - if (HTTP_R.Read(conn.Received().get())){ - #if DEBUG >= 4 - std::cout << "Received request: " << HTTP_R.getUrl() << std::endl; - #endif - conn.setHost(HTTP_R.GetHeader("X-Origin")); - //we assume the URL is the stream name with a 3 letter extension - streamname = HTTP_R.getUrl().substr(1); - size_t extDot = streamname.rfind('.'); - if (extDot != std::string::npos){streamname.resize(extDot);};//strip the extension - seek_pos = atoi(HTTP_R.GetVar("start").c_str()) * 1000;//seconds to ms - ready4data = true; - HTTP_R.Clean(); //clean for any possible next requests + if (!inited){ + if (conn.Received().size() || conn.spool()){ + if (HTTP_R.Read(conn.Received().get())){ + #if DEBUG >= 4 + std::cout << "Received request: " << HTTP_R.getUrl() << std::endl; + #endif + conn.setHost(HTTP_R.GetHeader("X-Origin")); + //we assume the URL is the stream name with a 3 letter extension + streamname = HTTP_R.getUrl().substr(1); + size_t extDot = streamname.rfind('.'); + if (extDot != std::string::npos){streamname.resize(extDot);};//strip the extension + seek_pos = atoi(HTTP_R.GetVar("start").c_str()) * 1000;//seconds to ms + ready4data = true; + HTTP_R.Clean(); //clean for any possible next requests + } } - }else{ - usleep(1000);//sleep 1ms } if (ready4data){ if (!inited){ @@ -83,10 +82,9 @@ namespace Connector_HTTP{ ss.SendNow("p\n"); inited = true; } - unsigned int now = time(0); + unsigned int now = Util::epoch(); if (now != lastStats){ lastStats = now; - ss.Send("S "); ss.SendNow(conn.getStats("HTTP_Progressive").c_str()); } if (ss.spool()){ @@ -98,19 +96,18 @@ namespace Connector_HTTP{ HTTP_S.protocol = "HTTP/1.0"; conn.SendNow(HTTP_S.BuildResponse("200", "OK"));//no SetBody = unknown length - this is intentional, we will stream the entire file conn.SendNow(FLV::Header, 13);//write FLV header - static FLV::Tag tmp; //write metadata - tmp.DTSCMetaInit(Strm); - conn.SendNow(tmp.data, tmp.len); + tag.DTSCMetaInit(Strm); + conn.SendNow(tag.data, tag.len); //write video init data, if needed if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){ - tmp.DTSCVideoInit(Strm); - conn.SendNow(tmp.data, tmp.len); + tag.DTSCVideoInit(Strm); + conn.SendNow(tag.data, tag.len); } //write audio init data, if needed if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){ - tmp.DTSCAudioInit(Strm); - conn.SendNow(tmp.data, tmp.len); + tag.DTSCAudioInit(Strm); + conn.SendNow(tag.data, tag.len); } progressive_has_sent_header = true; #if DEBUG >= 1 @@ -120,12 +117,13 @@ namespace Connector_HTTP{ tag.DTSCLoader(Strm); conn.SendNow(tag.data, tag.len);//write the tag contents } + }else{ + Util::sleep(1); } if (!ss.connected()){break;} } } conn.close(); - ss.Send("S "); ss.SendNow(conn.getStats("HTTP_Dynamic").c_str()); ss.close(); #if DEBUG >= 1 diff --git a/src/conn_raw.cpp b/src/conn_raw.cpp index 2c8d8242..6aa0a1e7 100644 --- a/src/conn_raw.cpp +++ b/src/conn_raw.cpp @@ -6,6 +6,7 @@ #include #include #include +#include /// Contains the main code for the RAW connector. /// Expects a single commandline argument telling it which stream to connect to, @@ -22,8 +23,8 @@ int main(int argc, char ** argv) { std::cout << "Could not open stream " << conf.getString("stream_name") << std::endl; return 1; } - unsigned int lastStats = 0; - unsigned int started = time(0); + long long int lastStats = 0; + long long int started = Util::epoch(); while(std::cout.good()){ if (S.spool()){ while (S.Received().size()){ @@ -31,18 +32,18 @@ int main(int argc, char ** argv) { S.Received().get().clear(); } }else{ - usleep(10000);//sleep 10ms if no data + Util::sleep(10);//sleep 10ms if no data } - unsigned int now = time(0); + unsigned int now = Util::epoch(); if (now != lastStats){ lastStats = now; std::stringstream st; - st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; + st << "S localhost RAW " << (Util::epoch() - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; S.SendNow(st.str().c_str()); } } std::stringstream st; - st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; + st << "S localhost RAW " << (Util::epoch() - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; S.SendNow(st.str().c_str()); S.close(); return 0; diff --git a/src/conn_rtmp.cpp b/src/conn_rtmp.cpp index 441a060d..cc28fca7 100644 --- a/src/conn_rtmp.cpp +++ b/src/conn_rtmp.cpp @@ -17,6 +17,7 @@ #include #include #include +#include /// Holds all functions and data unique to the RTMP Connector namespace Connector_RTMP{ @@ -38,7 +39,7 @@ namespace Connector_RTMP{ Socket::Connection Socket; ///< Socket connected to user Socket::Connection SS; ///< Socket connected to server std::string streamname; ///< Stream that will be opened - void parseChunk(std::string & buffer);///< Parses a single RTMP chunk. + void parseChunk(Socket::Buffer & buffer);///< Parses a single RTMP chunk. void sendCommand(AMF::Object & amfreply, int messagetype, int stream_id);///< Sends a RTMP command either in AMF or AMF3 mode. void parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id);///< Parses a single AMF command message. int Connector_RTMP(Socket::Connection conn); @@ -52,13 +53,13 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ FLV::Tag tag, init_tag; DTSC::Stream Strm; - while (!Socket.Received().available(1537) && Socket.connected()){Socket.spool(); usleep(5000);} + while (!Socket.Received().available(1537) && Socket.connected()){Socket.spool(); Util::sleep(5);} RTMPStream::handshake_in = Socket.Received().remove(1537); RTMPStream::rec_cnt += 1537; if (RTMPStream::doHandshake()){ Socket.SendNow(RTMPStream::handshake_out); - while (!Socket.Received().available(1536) && Socket.connected()){Socket.spool(); usleep(5000);} + while (!Socket.Received().available(1536) && Socket.connected()){Socket.spool(); Util::sleep(5);} Socket.Received().remove(1536); RTMPStream::rec_cnt += 1536; #if DEBUG >= 4 @@ -72,14 +73,14 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ } unsigned int lastStats = 0; + bool firsttime = true; while (Socket.connected()){ - if (Socket.spool() || Socket.Received().size()){ - while (Socket.Received().size()){ - parseChunk(Socket.Received().get()); - } + if (Socket.spool() || firsttime){ + parseChunk(Socket.Received()); + firsttime = false; }else{ - usleep(1000);//sleep 1ms to prevent high CPU usage + Util::sleep(1);//sleep 1ms to prevent high CPU usage } if (ready4data){ if (!inited){ @@ -100,10 +101,9 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ inited = true; } if (inited && !nostats){ - unsigned int now = time(0); + long long int now = Util::epoch(); if (now != lastStats){ lastStats = now; - SS.Send("S "); SS.SendNow(Socket.getStats("RTMP").c_str()); } } @@ -172,7 +172,6 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ } } Socket.close(); - SS.Send("S "); SS.SendNow(Socket.getStats("RTMP").c_str()); SS.close(); #if DEBUG >= 1 @@ -192,7 +191,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ }//Connector_RTMP /// Tries to get and parse one RTMP chunk at a time. -void Connector_RTMP::parseChunk(std::string & inbuffer){ +void Connector_RTMP::parseChunk(Socket::Buffer & inbuffer){ //for DTSC conversion static JSON::Value meta_out; static std::stringstream prebuffer; // Temporary buffer before sending real data @@ -240,10 +239,6 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ RTMPStream::snd_window_at = RTMPStream::snd_cnt; break; case 4:{ - #if DEBUG >= 4 - short int ucmtype = ntohs(*(short int*)next.data.c_str()); - fprintf(stderr, "CTRL: User control message %hi\n", ucmtype); - #endif //2 bytes event type, rest = event data //types: //0 = stream begin, 4 bytes ID @@ -254,6 +249,19 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ //6 = pingrequest, 4 bytes data //7 = pingresponse, 4 bytes data //we don't need to process this + #if DEBUG >= 4 + short int ucmtype = ntohs(*(short int*)next.data.c_str()); + switch (ucmtype){ + case 0: fprintf(stderr, "CTRL: UCM StreamBegin %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; + case 1: fprintf(stderr, "CTRL: UCM StreamEOF %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; + case 2: fprintf(stderr, "CTRL: UCM StreamDry %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; + case 3: fprintf(stderr, "CTRL: UCM SetBufferLength %i %i\n", ntohl(*((int*)(next.data.c_str()+2))), ntohl(*((int*)(next.data.c_str()+6)))); break; + case 4: fprintf(stderr, "CTRL: UCM StreamIsRecorded %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; + case 6: fprintf(stderr, "CTRL: UCM PingRequest %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; + case 7: fprintf(stderr, "CTRL: UCM PingResponse %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break; + default: fprintf(stderr, "CTRL: UCM Unknown (%hi)\n", ucmtype); break; + } + #endif } break; case 5://window size of other end #if DEBUG >= 4 @@ -510,11 +518,55 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int play_msgtype = messagetype; play_streamid = stream_id; stream_inited = false; + + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Seek.Notify")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Seeking to the specified time")); + amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfreply, play_msgtype, play_streamid); SS.Send("s "); SS.Send(JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString().c_str()); SS.Send("\n"); return; }//seek + if ((amfdata.getContentP(0)->StrValue() == "pauseRaw") || (amfdata.getContentP(0)->StrValue() == "pause")){ + if (amfdata.getContentP(3)->NumValue()){ + SS.Send("q\n");//quit playing + //send a status reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Pause.Notify")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Pausing playback")); + amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfreply, play_msgtype, play_streamid); + }else{ + SS.Send("p\n");//start playing + //send a status reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Unpause.Notify")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Resuming playback")); + amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfreply, play_msgtype, play_streamid); + } + return; + }//seek #if DEBUG >= 2 fprintf(stderr, "AMF0 command not processed! :(\n"); diff --git a/src/controller.cpp b/src/controller.cpp index 84d3d111..e962d3b2 100644 --- a/src/controller.cpp +++ b/src/controller.cpp @@ -11,14 +11,12 @@ #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include @@ -30,6 +28,7 @@ #include #include #include +#include #include "server.html.h" #define UPLINK_INTERVAL 30 @@ -87,7 +86,7 @@ void Log(std::string kind, std::string message){ if ((*it)[2] == message){return;} } JSON::Value m; - m.append((long long int)time(0)); + m.append(Util::epoch()); m.append(kind); m.append(message); Storage["log"].append(m); @@ -261,16 +260,15 @@ void startStream(std::string name, JSON::Value & data){ } void CheckStats(JSON::Value & stats){ - unsigned int currTime = time(0); + long long int currTime = Util::epoch(); for (JSON::ObjIter jit = stats.ObjBegin(); jit != stats.ObjEnd(); jit++){ if (currTime - lastBuffer[jit->first] > 120){ stats.removeMember(jit->first); return; }else{ if (jit->second.isMember("curr") && jit->second["curr"].size() > 0){ - long long int nowtime = time(0); for (JSON::ObjIter u_it = jit->second["curr"].ObjBegin(); u_it != jit->second["curr"].ObjEnd(); ++u_it){ - if (u_it->second.isMember("now") && u_it->second["now"].asInt() < nowtime - 3){ + if (u_it->second.isMember("now") && u_it->second["now"].asInt() < currTime - 3){ jit->second["log"].append(u_it->second); jit->second["curr"].removeMember(u_it->first); if (!jit->second["curr"].size()){break;} @@ -283,7 +281,7 @@ void CheckStats(JSON::Value & stats){ } void CheckAllStreams(JSON::Value & data){ - unsigned int currTime = time(0); + long long int currTime = Util::epoch(); bool changed = false; for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){ if (!Util::Procs::isActive(jit->first)){ @@ -385,14 +383,14 @@ int main(int argc, char ** argv){ while (API_Socket.connected() && conf.is_active){ usleep(10000); //sleep for 10 ms - prevents 100% CPU time - if (time(0) - processchecker > 10){ - processchecker = time(0); + if (Util::epoch() - processchecker > 10){ + processchecker = Util::epoch(); Connector::CheckProtocols(Connector::Storage["config"]["protocols"]); Connector::CheckAllStreams(Connector::Storage["streams"]); Connector::CheckStats(Connector::Storage["statistics"]); } - if (conf.getBool("uplink") && time(0) - lastuplink > UPLINK_INTERVAL){ - lastuplink = time(0); + if (conf.getBool("uplink") && Util::epoch() - lastuplink > UPLINK_INTERVAL){ + lastuplink = Util::epoch(); bool gotUplink = false; if (users.size() > 0){ for( std::vector< Connector::ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) { @@ -449,7 +447,7 @@ int main(int argc, char ** argv){ it->Received().get().clear(); if (Request.isMember("buffer")){ std::string thisbuffer = Request["buffer"]; - Connector::lastBuffer[thisbuffer] = time(0); + Connector::lastBuffer[thisbuffer] = Util::epoch(); if (Request.isMember("meta")){ Connector::Storage["statistics"][thisbuffer]["meta"] = Request["meta"]; } @@ -470,7 +468,7 @@ int main(int argc, char ** argv){ std::string thisfile = Request["vod"]["filename"]; for (JSON::ObjIter oit = Connector::Storage["streams"].ObjBegin(); oit != Connector::Storage["streams"].ObjEnd(); ++oit){ if (oit->second["channel"]["URL"].asString() == thisfile){ - Connector::lastBuffer[oit->first] = time(0); + Connector::lastBuffer[oit->first] = Util::epoch(); if (Request["vod"].isMember("meta")){ Connector::Storage["statistics"][oit->first]["meta"] = Request["vod"]["meta"]; } @@ -563,7 +561,7 @@ int main(int argc, char ** argv){ Response["config"] = Connector::Storage["config"]; Response["streams"] = Connector::Storage["streams"]; //add required data to the current unix time to the config, for syncing reasons - Response["config"]["time"] = (long long int)time(0); + Response["config"]["time"] = Util::epoch(); if (!Response["config"].isMember("serverid")){Response["config"]["serverid"] = "";} //sent any available logs and statistics Response["log"] = Connector::Storage["log"]; diff --git a/src/player.cpp b/src/player.cpp index 263d28b3..dab3acd7 100644 --- a/src/player.cpp +++ b/src/player.cpp @@ -9,6 +9,7 @@ #include #include #include +#include /// Copy of stats from buffer_user.cpp class Stats{ @@ -47,14 +48,6 @@ class Stats{ }; }; - -/// Gets the current system time in milliseconds. -long long int getNowMS(){ - timeval t; - gettimeofday(&t, 0); - return t.tv_sec * 1000 + t.tv_usec/1000; -}//getNowMS - int main(int argc, char** argv){ Util::Config conf(argv[0], PACKAGE_VERSION); conf.addOption("filename", JSON::fromString("{\"arg_num\":1, \"help\":\"Name of the file to write to stdout.\"}")); @@ -70,7 +63,7 @@ int main(int argc, char** argv){ pausemark["time"] = (long long int)0; Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); - int lasttime = time(0);//time last packet was sent + int lasttime = Util::epoch();//time last packet was sent //send the header { @@ -91,7 +84,7 @@ int main(int argc, char** argv){ long long bench = 0;//for benchmarking Stats sts; - while (in_out.connected() && std::cin.good() && std::cout.good() && (time(0) - lasttime < 60)){ + while (in_out.connected() && std::cin.good() && std::cout.good() && (Util::epoch() - lasttime < 60)){ if (in_out.spool()){ while (in_out.Received().size()){ //delete anything that doesn't end with a newline @@ -121,8 +114,8 @@ int main(int argc, char** argv){ json_sts["vod"]["host"] = sts.host; json_sts["vod"]["connector"] = sts.connector; json_sts["vod"]["filename"] = conf.getString("filename"); - json_sts["vod"]["now"] = (long long int)time(0); - json_sts["vod"]["start"] = (long long int)(time(0) - sts.conntime); + json_sts["vod"]["now"] = Util::epoch(); + json_sts["vod"]["start"] = Util::epoch() - sts.conntime; if (!meta_sent){ json_sts["vod"]["meta"] = meta; meta_sent = true; @@ -135,12 +128,15 @@ int main(int argc, char** argv){ case 's':{ //second-seek int ms = JSON::Value(in_out.Received().get().substr(2)).asInt(); bool ret = source.seek_time(ms); + lastTime = 0; } break; case 'f':{ //frame-seek bool ret = source.seek_frame(JSON::Value(in_out.Received().get().substr(2)).asInt()); + lastTime = 0; } break; case 'p':{ //play playing = -1; + lastTime = 0; in_out.setBlocking(false); } break; case 'o':{ //once-play @@ -150,7 +146,7 @@ int main(int argc, char** argv){ #if DEBUG >= 4 std::cerr << "Playing one keyframe" << std::endl; #endif - bench = getNowMS(); + bench = Util::getMS(); } break; case 'q':{ //quit-playing playing = 0; @@ -162,15 +158,17 @@ int main(int argc, char** argv){ } } if (playing != 0){ - now = getNowMS(); - if (playing > 0 || meta["video"]["keyms"].asInt() <= now-lastTime) { + now = Util::getMS(); source.seekNext(); if (source.getJSON().isMember("keyframe")){ + if (playing == -1 && meta["video"]["keyms"].asInt() > now-lastTime) { + Util::sleep(meta["video"]["keyms"].asInt()-(now-lastTime)); + } lastTime = now; if (playing > 0){--playing;} if (playing == 0){ #if DEBUG >= 4 - std::cerr << "Sending pause_marker (" << (getNowMS() - bench) << "ms)" << std::endl; + std::cerr << "Sending pause_marker (" << (Util::getMS() - bench) << "ms)" << std::endl; #endif pausemark["time"] = (long long int)now; pausemark.toPacked(); @@ -179,7 +177,7 @@ int main(int argc, char** argv){ } } if (playing != 0){ - lasttime = time(0); + lasttime = Util::epoch(); //insert proper header for this type of data in_out.Send("DTPD"); //insert the packet length @@ -187,11 +185,8 @@ int main(int argc, char** argv){ in_out.Send((char*)&size, 4); in_out.SendNow(source.getPacket()); } - } else { - usleep((meta["video"]["keyms"].asInt()-(now-lastTime))*1000); - } }else{ - usleep(10000);//sleep 10ms + Util::sleep(10); } }