Several playback speed fixes and other minor improvements.
This commit is contained in:
parent
802f8a22b4
commit
ad5718acc6
8 changed files with 137 additions and 102 deletions
|
@ -2,6 +2,7 @@
|
|||
/// Contains definitions for buffer streams.
|
||||
|
||||
#include "buffer_stream.h"
|
||||
#include <mist/timing.h>
|
||||
|
||||
/// Stores the globally equal reference.
|
||||
Buffer::Stream * Buffer::Stream::ref = 0;
|
||||
|
@ -47,7 +48,7 @@ Buffer::Stream::~Stream(){
|
|||
/// Calculate and return the current statistics in JSON format.
|
||||
std::string & Buffer::Stream::getStats(){
|
||||
static std::string ret;
|
||||
unsigned int now = time(0);
|
||||
long long int now = Util::epoch();
|
||||
unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
|
||||
stats_mutex.lock();
|
||||
if (users.size() > 0){
|
||||
|
@ -125,7 +126,7 @@ void Buffer::Stream::saveStats(std::string username, Stats & stats){
|
|||
Storage["curr"][username]["down"] = stats.down;
|
||||
Storage["curr"][username]["conntime"] = stats.conntime;
|
||||
Storage["curr"][username]["host"] = stats.host;
|
||||
Storage["curr"][username]["start"] = (unsigned int) time(0) - stats.conntime;
|
||||
Storage["curr"][username]["start"] = Util::epoch() - stats.conntime;
|
||||
stats_mutex.unlock();
|
||||
}
|
||||
|
||||
|
@ -143,7 +144,7 @@ void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string
|
|||
Storage["log"][username]["down"] = stats.down;
|
||||
Storage["log"][username]["conntime"] = stats.conntime;
|
||||
Storage["log"][username]["host"] = stats.host;
|
||||
Storage["log"][username]["start"] = (unsigned int)time(0) - stats.conntime;
|
||||
Storage["log"][username]["start"] = Util::epoch() - stats.conntime;
|
||||
stats_mutex.unlock();
|
||||
cleanUsers();
|
||||
}
|
||||
|
|
|
@ -10,8 +10,6 @@
|
|||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <getopt.h>
|
||||
#include <ctime>
|
||||
#include <sys/time.h>//for gettimeofday
|
||||
#include <set>
|
||||
#include <openssl/md5.h>
|
||||
#include <mist/socket.h>
|
||||
|
@ -19,6 +17,7 @@
|
|||
#include <mist/config.h>
|
||||
#include <mist/procs.h>
|
||||
#include <mist/stream.h>
|
||||
#include <mist/timing.h>
|
||||
#include "tinythread.h"
|
||||
#include "embed.js.h"
|
||||
|
||||
|
@ -324,13 +323,6 @@ namespace Connector_HTTP{
|
|||
return "none";
|
||||
}
|
||||
|
||||
/// Gets the current system time in milliseconds.
|
||||
long long int getNowMS(){
|
||||
timeval t;
|
||||
gettimeofday(&t, 0);
|
||||
return t.tv_sec * 1000 + t.tv_usec/1000;
|
||||
}//getNowMS
|
||||
|
||||
/// Thread for handling a single HTTP connection
|
||||
void Handle_HTTP_Connection(void * pointer){
|
||||
Socket::Connection * conn = (Socket::Connection *)pointer;
|
||||
|
@ -340,7 +332,7 @@ namespace Connector_HTTP{
|
|||
if (conn->Received().size() || conn->spool()){
|
||||
if (Client.Read(conn->Received().get())){
|
||||
std::string handler = getHTTPType(Client);
|
||||
long long int startms = getNowMS();
|
||||
long long int startms = Util::getMS();
|
||||
#if DEBUG >= 4
|
||||
std::cout << "Received request: " << Client.getUrl() << " (" << conn->getSocket() << ") => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl;
|
||||
#endif
|
||||
|
@ -354,7 +346,7 @@ namespace Connector_HTTP{
|
|||
Handle_Through_Connector(Client, conn, handler);
|
||||
}
|
||||
#if DEBUG >= 4
|
||||
std::cout << "Completed request (" << conn->getSocket() << ") " << handler << " in " << (getNowMS() - startms) << " ms" << std::endl;
|
||||
std::cout << "Completed request (" << conn->getSocket() << ") " << handler << " in " << (Util::getMS() - startms) << " ms" << std::endl;
|
||||
#endif
|
||||
Client.Clean(); //clean for any possible next requests
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <getopt.h>
|
||||
#include <ctime>
|
||||
#include <mist/socket.h>
|
||||
#include <mist/http_parser.h>
|
||||
#include <mist/json.h>
|
||||
|
@ -22,6 +21,7 @@
|
|||
#include <mist/config.h>
|
||||
#include <sstream>
|
||||
#include <mist/stream.h>
|
||||
#include <mist/timing.h>
|
||||
|
||||
/// Holds everything unique to HTTP Dynamic Connector.
|
||||
namespace Connector_HTTP{
|
||||
|
@ -234,10 +234,9 @@ namespace Connector_HTTP{
|
|||
#endif
|
||||
inited = true;
|
||||
}
|
||||
unsigned int now = time(0);
|
||||
unsigned int now = Util::epoch();
|
||||
if (now != lastStats){
|
||||
lastStats = now;
|
||||
ss.Send("S ");
|
||||
ss.SendNow(conn.getStats("HTTP_Dynamic").c_str());
|
||||
}
|
||||
if (ss.spool()){
|
||||
|
@ -347,7 +346,6 @@ namespace Connector_HTTP{
|
|||
}
|
||||
}
|
||||
conn.close();
|
||||
ss.Send("S ");
|
||||
ss.SendNow(conn.getStats("HTTP_Dynamic").c_str());
|
||||
ss.close();
|
||||
#if DEBUG >= 1
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <getopt.h>
|
||||
#include <ctime>
|
||||
#include <sstream>
|
||||
#include <mist/socket.h>
|
||||
#include <mist/http_parser.h>
|
||||
|
@ -19,6 +18,7 @@
|
|||
#include <mist/amf.h>
|
||||
#include <mist/config.h>
|
||||
#include <mist/stream.h>
|
||||
#include <mist/timing.h>
|
||||
|
||||
/// Holds everything unique to HTTP Progressive Connector.
|
||||
namespace Connector_HTTP{
|
||||
|
@ -36,11 +36,11 @@ namespace Connector_HTTP{
|
|||
|
||||
unsigned int lastStats = 0;
|
||||
unsigned int seek_pos = 0;//seek position in ms
|
||||
conn.setBlocking(false);//do not block on conn.spool() when no data is available
|
||||
|
||||
while (conn.connected()){
|
||||
//only parse input if available or not yet init'ed
|
||||
if (conn.spool() || conn.Received().size()){
|
||||
if (!inited){
|
||||
if (conn.Received().size() || conn.spool()){
|
||||
if (HTTP_R.Read(conn.Received().get())){
|
||||
#if DEBUG >= 4
|
||||
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
|
||||
|
@ -54,8 +54,7 @@ namespace Connector_HTTP{
|
|||
ready4data = true;
|
||||
HTTP_R.Clean(); //clean for any possible next requests
|
||||
}
|
||||
}else{
|
||||
usleep(1000);//sleep 1ms
|
||||
}
|
||||
}
|
||||
if (ready4data){
|
||||
if (!inited){
|
||||
|
@ -83,10 +82,9 @@ namespace Connector_HTTP{
|
|||
ss.SendNow("p\n");
|
||||
inited = true;
|
||||
}
|
||||
unsigned int now = time(0);
|
||||
unsigned int now = Util::epoch();
|
||||
if (now != lastStats){
|
||||
lastStats = now;
|
||||
ss.Send("S ");
|
||||
ss.SendNow(conn.getStats("HTTP_Progressive").c_str());
|
||||
}
|
||||
if (ss.spool()){
|
||||
|
@ -98,19 +96,18 @@ namespace Connector_HTTP{
|
|||
HTTP_S.protocol = "HTTP/1.0";
|
||||
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));//no SetBody = unknown length - this is intentional, we will stream the entire file
|
||||
conn.SendNow(FLV::Header, 13);//write FLV header
|
||||
static FLV::Tag tmp;
|
||||
//write metadata
|
||||
tmp.DTSCMetaInit(Strm);
|
||||
conn.SendNow(tmp.data, tmp.len);
|
||||
tag.DTSCMetaInit(Strm);
|
||||
conn.SendNow(tag.data, tag.len);
|
||||
//write video init data, if needed
|
||||
if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){
|
||||
tmp.DTSCVideoInit(Strm);
|
||||
conn.SendNow(tmp.data, tmp.len);
|
||||
tag.DTSCVideoInit(Strm);
|
||||
conn.SendNow(tag.data, tag.len);
|
||||
}
|
||||
//write audio init data, if needed
|
||||
if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){
|
||||
tmp.DTSCAudioInit(Strm);
|
||||
conn.SendNow(tmp.data, tmp.len);
|
||||
tag.DTSCAudioInit(Strm);
|
||||
conn.SendNow(tag.data, tag.len);
|
||||
}
|
||||
progressive_has_sent_header = true;
|
||||
#if DEBUG >= 1
|
||||
|
@ -120,12 +117,13 @@ namespace Connector_HTTP{
|
|||
tag.DTSCLoader(Strm);
|
||||
conn.SendNow(tag.data, tag.len);//write the tag contents
|
||||
}
|
||||
}else{
|
||||
Util::sleep(1);
|
||||
}
|
||||
if (!ss.connected()){break;}
|
||||
}
|
||||
}
|
||||
conn.close();
|
||||
ss.Send("S ");
|
||||
ss.SendNow(conn.getStats("HTTP_Dynamic").c_str());
|
||||
ss.close();
|
||||
#if DEBUG >= 1
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
#include <mist/config.h>
|
||||
#include <mist/socket.h>
|
||||
#include <mist/stream.h>
|
||||
#include <mist/timing.h>
|
||||
|
||||
/// Contains the main code for the RAW connector.
|
||||
/// Expects a single commandline argument telling it which stream to connect to,
|
||||
|
@ -22,8 +23,8 @@ int main(int argc, char ** argv) {
|
|||
std::cout << "Could not open stream " << conf.getString("stream_name") << std::endl;
|
||||
return 1;
|
||||
}
|
||||
unsigned int lastStats = 0;
|
||||
unsigned int started = time(0);
|
||||
long long int lastStats = 0;
|
||||
long long int started = Util::epoch();
|
||||
while(std::cout.good()){
|
||||
if (S.spool()){
|
||||
while (S.Received().size()){
|
||||
|
@ -31,18 +32,18 @@ int main(int argc, char ** argv) {
|
|||
S.Received().get().clear();
|
||||
}
|
||||
}else{
|
||||
usleep(10000);//sleep 10ms if no data
|
||||
Util::sleep(10);//sleep 10ms if no data
|
||||
}
|
||||
unsigned int now = time(0);
|
||||
unsigned int now = Util::epoch();
|
||||
if (now != lastStats){
|
||||
lastStats = now;
|
||||
std::stringstream st;
|
||||
st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
|
||||
st << "S localhost RAW " << (Util::epoch() - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
|
||||
S.SendNow(st.str().c_str());
|
||||
}
|
||||
}
|
||||
std::stringstream st;
|
||||
st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
|
||||
st << "S localhost RAW " << (Util::epoch() - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
|
||||
S.SendNow(st.str().c_str());
|
||||
S.close();
|
||||
return 0;
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include <mist/amf.h>
|
||||
#include <mist/rtmpchunks.h>
|
||||
#include <mist/stream.h>
|
||||
#include <mist/timing.h>
|
||||
|
||||
/// Holds all functions and data unique to the RTMP Connector
|
||||
namespace Connector_RTMP{
|
||||
|
@ -38,7 +39,7 @@ namespace Connector_RTMP{
|
|||
Socket::Connection Socket; ///< Socket connected to user
|
||||
Socket::Connection SS; ///< Socket connected to server
|
||||
std::string streamname; ///< Stream that will be opened
|
||||
void parseChunk(std::string & buffer);///< Parses a single RTMP chunk.
|
||||
void parseChunk(Socket::Buffer & buffer);///< Parses a single RTMP chunk.
|
||||
void sendCommand(AMF::Object & amfreply, int messagetype, int stream_id);///< Sends a RTMP command either in AMF or AMF3 mode.
|
||||
void parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id);///< Parses a single AMF command message.
|
||||
int Connector_RTMP(Socket::Connection conn);
|
||||
|
@ -52,13 +53,13 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
|||
FLV::Tag tag, init_tag;
|
||||
DTSC::Stream Strm;
|
||||
|
||||
while (!Socket.Received().available(1537) && Socket.connected()){Socket.spool(); usleep(5000);}
|
||||
while (!Socket.Received().available(1537) && Socket.connected()){Socket.spool(); Util::sleep(5);}
|
||||
RTMPStream::handshake_in = Socket.Received().remove(1537);
|
||||
RTMPStream::rec_cnt += 1537;
|
||||
|
||||
if (RTMPStream::doHandshake()){
|
||||
Socket.SendNow(RTMPStream::handshake_out);
|
||||
while (!Socket.Received().available(1536) && Socket.connected()){Socket.spool(); usleep(5000);}
|
||||
while (!Socket.Received().available(1536) && Socket.connected()){Socket.spool(); Util::sleep(5);}
|
||||
Socket.Received().remove(1536);
|
||||
RTMPStream::rec_cnt += 1536;
|
||||
#if DEBUG >= 4
|
||||
|
@ -72,14 +73,14 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
|||
}
|
||||
|
||||
unsigned int lastStats = 0;
|
||||
bool firsttime = true;
|
||||
|
||||
while (Socket.connected()){
|
||||
if (Socket.spool() || Socket.Received().size()){
|
||||
while (Socket.Received().size()){
|
||||
parseChunk(Socket.Received().get());
|
||||
}
|
||||
if (Socket.spool() || firsttime){
|
||||
parseChunk(Socket.Received());
|
||||
firsttime = false;
|
||||
}else{
|
||||
usleep(1000);//sleep 1ms to prevent high CPU usage
|
||||
Util::sleep(1);//sleep 1ms to prevent high CPU usage
|
||||
}
|
||||
if (ready4data){
|
||||
if (!inited){
|
||||
|
@ -100,10 +101,9 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
|||
inited = true;
|
||||
}
|
||||
if (inited && !nostats){
|
||||
unsigned int now = time(0);
|
||||
long long int now = Util::epoch();
|
||||
if (now != lastStats){
|
||||
lastStats = now;
|
||||
SS.Send("S ");
|
||||
SS.SendNow(Socket.getStats("RTMP").c_str());
|
||||
}
|
||||
}
|
||||
|
@ -172,7 +172,6 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
|||
}
|
||||
}
|
||||
Socket.close();
|
||||
SS.Send("S ");
|
||||
SS.SendNow(Socket.getStats("RTMP").c_str());
|
||||
SS.close();
|
||||
#if DEBUG >= 1
|
||||
|
@ -192,7 +191,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
|||
}//Connector_RTMP
|
||||
|
||||
/// Tries to get and parse one RTMP chunk at a time.
|
||||
void Connector_RTMP::parseChunk(std::string & inbuffer){
|
||||
void Connector_RTMP::parseChunk(Socket::Buffer & inbuffer){
|
||||
//for DTSC conversion
|
||||
static JSON::Value meta_out;
|
||||
static std::stringstream prebuffer; // Temporary buffer before sending real data
|
||||
|
@ -240,10 +239,6 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
|
|||
RTMPStream::snd_window_at = RTMPStream::snd_cnt;
|
||||
break;
|
||||
case 4:{
|
||||
#if DEBUG >= 4
|
||||
short int ucmtype = ntohs(*(short int*)next.data.c_str());
|
||||
fprintf(stderr, "CTRL: User control message %hi\n", ucmtype);
|
||||
#endif
|
||||
//2 bytes event type, rest = event data
|
||||
//types:
|
||||
//0 = stream begin, 4 bytes ID
|
||||
|
@ -254,6 +249,19 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
|
|||
//6 = pingrequest, 4 bytes data
|
||||
//7 = pingresponse, 4 bytes data
|
||||
//we don't need to process this
|
||||
#if DEBUG >= 4
|
||||
short int ucmtype = ntohs(*(short int*)next.data.c_str());
|
||||
switch (ucmtype){
|
||||
case 0: fprintf(stderr, "CTRL: UCM StreamBegin %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break;
|
||||
case 1: fprintf(stderr, "CTRL: UCM StreamEOF %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break;
|
||||
case 2: fprintf(stderr, "CTRL: UCM StreamDry %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break;
|
||||
case 3: fprintf(stderr, "CTRL: UCM SetBufferLength %i %i\n", ntohl(*((int*)(next.data.c_str()+2))), ntohl(*((int*)(next.data.c_str()+6)))); break;
|
||||
case 4: fprintf(stderr, "CTRL: UCM StreamIsRecorded %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break;
|
||||
case 6: fprintf(stderr, "CTRL: UCM PingRequest %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break;
|
||||
case 7: fprintf(stderr, "CTRL: UCM PingResponse %i\n", ntohl(*((int*)(next.data.c_str()+2)))); break;
|
||||
default: fprintf(stderr, "CTRL: UCM Unknown (%hi)\n", ucmtype); break;
|
||||
}
|
||||
#endif
|
||||
} break;
|
||||
case 5://window size of other end
|
||||
#if DEBUG >= 4
|
||||
|
@ -510,11 +518,55 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
|
|||
play_msgtype = messagetype;
|
||||
play_streamid = stream_id;
|
||||
stream_inited = false;
|
||||
|
||||
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
||||
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
|
||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||
amfreply.addContent(AMF::Object(""));//info
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Seek.Notify"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("description", "Seeking to the specified time"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||
sendCommand(amfreply, play_msgtype, play_streamid);
|
||||
SS.Send("s ");
|
||||
SS.Send(JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString().c_str());
|
||||
SS.Send("\n");
|
||||
return;
|
||||
}//seek
|
||||
if ((amfdata.getContentP(0)->StrValue() == "pauseRaw") || (amfdata.getContentP(0)->StrValue() == "pause")){
|
||||
if (amfdata.getContentP(3)->NumValue()){
|
||||
SS.Send("q\n");//quit playing
|
||||
//send a status reply
|
||||
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
||||
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
|
||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||
amfreply.addContent(AMF::Object(""));//info
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Pause.Notify"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("description", "Pausing playback"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||
sendCommand(amfreply, play_msgtype, play_streamid);
|
||||
}else{
|
||||
SS.Send("p\n");//start playing
|
||||
//send a status reply
|
||||
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
||||
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
|
||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||
amfreply.addContent(AMF::Object(""));//info
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Unpause.Notify"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("description", "Resuming playback"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||
sendCommand(amfreply, play_msgtype, play_streamid);
|
||||
}
|
||||
return;
|
||||
}//seek
|
||||
|
||||
#if DEBUG >= 2
|
||||
fprintf(stderr, "AMF0 command not processed! :(\n");
|
||||
|
|
|
@ -11,14 +11,12 @@
|
|||
#include <cstdlib>
|
||||
#include <queue>
|
||||
#include <cmath>
|
||||
#include <ctime>
|
||||
#include <cstdio>
|
||||
#include <climits>
|
||||
#include <cstring>
|
||||
#include <unistd.h>
|
||||
#include <getopt.h>
|
||||
#include <set>
|
||||
#include <sys/time.h>
|
||||
#include <sys/wait.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
|
@ -30,6 +28,7 @@
|
|||
#include <mist/http_parser.h>
|
||||
#include <mist/procs.h>
|
||||
#include <mist/auth.h>
|
||||
#include <mist/timing.h>
|
||||
#include "server.html.h"
|
||||
|
||||
#define UPLINK_INTERVAL 30
|
||||
|
@ -87,7 +86,7 @@ void Log(std::string kind, std::string message){
|
|||
if ((*it)[2] == message){return;}
|
||||
}
|
||||
JSON::Value m;
|
||||
m.append((long long int)time(0));
|
||||
m.append(Util::epoch());
|
||||
m.append(kind);
|
||||
m.append(message);
|
||||
Storage["log"].append(m);
|
||||
|
@ -261,16 +260,15 @@ void startStream(std::string name, JSON::Value & data){
|
|||
}
|
||||
|
||||
void CheckStats(JSON::Value & stats){
|
||||
unsigned int currTime = time(0);
|
||||
long long int currTime = Util::epoch();
|
||||
for (JSON::ObjIter jit = stats.ObjBegin(); jit != stats.ObjEnd(); jit++){
|
||||
if (currTime - lastBuffer[jit->first] > 120){
|
||||
stats.removeMember(jit->first);
|
||||
return;
|
||||
}else{
|
||||
if (jit->second.isMember("curr") && jit->second["curr"].size() > 0){
|
||||
long long int nowtime = time(0);
|
||||
for (JSON::ObjIter u_it = jit->second["curr"].ObjBegin(); u_it != jit->second["curr"].ObjEnd(); ++u_it){
|
||||
if (u_it->second.isMember("now") && u_it->second["now"].asInt() < nowtime - 3){
|
||||
if (u_it->second.isMember("now") && u_it->second["now"].asInt() < currTime - 3){
|
||||
jit->second["log"].append(u_it->second);
|
||||
jit->second["curr"].removeMember(u_it->first);
|
||||
if (!jit->second["curr"].size()){break;}
|
||||
|
@ -283,7 +281,7 @@ void CheckStats(JSON::Value & stats){
|
|||
}
|
||||
|
||||
void CheckAllStreams(JSON::Value & data){
|
||||
unsigned int currTime = time(0);
|
||||
long long int currTime = Util::epoch();
|
||||
bool changed = false;
|
||||
for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){
|
||||
if (!Util::Procs::isActive(jit->first)){
|
||||
|
@ -385,14 +383,14 @@ int main(int argc, char ** argv){
|
|||
while (API_Socket.connected() && conf.is_active){
|
||||
usleep(10000); //sleep for 10 ms - prevents 100% CPU time
|
||||
|
||||
if (time(0) - processchecker > 10){
|
||||
processchecker = time(0);
|
||||
if (Util::epoch() - processchecker > 10){
|
||||
processchecker = Util::epoch();
|
||||
Connector::CheckProtocols(Connector::Storage["config"]["protocols"]);
|
||||
Connector::CheckAllStreams(Connector::Storage["streams"]);
|
||||
Connector::CheckStats(Connector::Storage["statistics"]);
|
||||
}
|
||||
if (conf.getBool("uplink") && time(0) - lastuplink > UPLINK_INTERVAL){
|
||||
lastuplink = time(0);
|
||||
if (conf.getBool("uplink") && Util::epoch() - lastuplink > UPLINK_INTERVAL){
|
||||
lastuplink = Util::epoch();
|
||||
bool gotUplink = false;
|
||||
if (users.size() > 0){
|
||||
for( std::vector< Connector::ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) {
|
||||
|
@ -449,7 +447,7 @@ int main(int argc, char ** argv){
|
|||
it->Received().get().clear();
|
||||
if (Request.isMember("buffer")){
|
||||
std::string thisbuffer = Request["buffer"];
|
||||
Connector::lastBuffer[thisbuffer] = time(0);
|
||||
Connector::lastBuffer[thisbuffer] = Util::epoch();
|
||||
if (Request.isMember("meta")){
|
||||
Connector::Storage["statistics"][thisbuffer]["meta"] = Request["meta"];
|
||||
}
|
||||
|
@ -470,7 +468,7 @@ int main(int argc, char ** argv){
|
|||
std::string thisfile = Request["vod"]["filename"];
|
||||
for (JSON::ObjIter oit = Connector::Storage["streams"].ObjBegin(); oit != Connector::Storage["streams"].ObjEnd(); ++oit){
|
||||
if (oit->second["channel"]["URL"].asString() == thisfile){
|
||||
Connector::lastBuffer[oit->first] = time(0);
|
||||
Connector::lastBuffer[oit->first] = Util::epoch();
|
||||
if (Request["vod"].isMember("meta")){
|
||||
Connector::Storage["statistics"][oit->first]["meta"] = Request["vod"]["meta"];
|
||||
}
|
||||
|
@ -563,7 +561,7 @@ int main(int argc, char ** argv){
|
|||
Response["config"] = Connector::Storage["config"];
|
||||
Response["streams"] = Connector::Storage["streams"];
|
||||
//add required data to the current unix time to the config, for syncing reasons
|
||||
Response["config"]["time"] = (long long int)time(0);
|
||||
Response["config"]["time"] = Util::epoch();
|
||||
if (!Response["config"].isMember("serverid")){Response["config"]["serverid"] = "";}
|
||||
//sent any available logs and statistics
|
||||
Response["log"] = Connector::Storage["log"];
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include <mist/json.h>
|
||||
#include <mist/config.h>
|
||||
#include <mist/socket.h>
|
||||
#include <mist/timing.h>
|
||||
|
||||
/// Copy of stats from buffer_user.cpp
|
||||
class Stats{
|
||||
|
@ -47,14 +48,6 @@ class Stats{
|
|||
};
|
||||
};
|
||||
|
||||
|
||||
/// Gets the current system time in milliseconds.
|
||||
long long int getNowMS(){
|
||||
timeval t;
|
||||
gettimeofday(&t, 0);
|
||||
return t.tv_sec * 1000 + t.tv_usec/1000;
|
||||
}//getNowMS
|
||||
|
||||
int main(int argc, char** argv){
|
||||
Util::Config conf(argv[0], PACKAGE_VERSION);
|
||||
conf.addOption("filename", JSON::fromString("{\"arg_num\":1, \"help\":\"Name of the file to write to stdout.\"}"));
|
||||
|
@ -70,7 +63,7 @@ int main(int argc, char** argv){
|
|||
pausemark["time"] = (long long int)0;
|
||||
|
||||
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
|
||||
int lasttime = time(0);//time last packet was sent
|
||||
int lasttime = Util::epoch();//time last packet was sent
|
||||
|
||||
//send the header
|
||||
{
|
||||
|
@ -91,7 +84,7 @@ int main(int argc, char** argv){
|
|||
long long bench = 0;//for benchmarking
|
||||
Stats sts;
|
||||
|
||||
while (in_out.connected() && std::cin.good() && std::cout.good() && (time(0) - lasttime < 60)){
|
||||
while (in_out.connected() && std::cin.good() && std::cout.good() && (Util::epoch() - lasttime < 60)){
|
||||
if (in_out.spool()){
|
||||
while (in_out.Received().size()){
|
||||
//delete anything that doesn't end with a newline
|
||||
|
@ -121,8 +114,8 @@ int main(int argc, char** argv){
|
|||
json_sts["vod"]["host"] = sts.host;
|
||||
json_sts["vod"]["connector"] = sts.connector;
|
||||
json_sts["vod"]["filename"] = conf.getString("filename");
|
||||
json_sts["vod"]["now"] = (long long int)time(0);
|
||||
json_sts["vod"]["start"] = (long long int)(time(0) - sts.conntime);
|
||||
json_sts["vod"]["now"] = Util::epoch();
|
||||
json_sts["vod"]["start"] = Util::epoch() - sts.conntime;
|
||||
if (!meta_sent){
|
||||
json_sts["vod"]["meta"] = meta;
|
||||
meta_sent = true;
|
||||
|
@ -135,12 +128,15 @@ int main(int argc, char** argv){
|
|||
case 's':{ //second-seek
|
||||
int ms = JSON::Value(in_out.Received().get().substr(2)).asInt();
|
||||
bool ret = source.seek_time(ms);
|
||||
lastTime = 0;
|
||||
} break;
|
||||
case 'f':{ //frame-seek
|
||||
bool ret = source.seek_frame(JSON::Value(in_out.Received().get().substr(2)).asInt());
|
||||
lastTime = 0;
|
||||
} break;
|
||||
case 'p':{ //play
|
||||
playing = -1;
|
||||
lastTime = 0;
|
||||
in_out.setBlocking(false);
|
||||
} break;
|
||||
case 'o':{ //once-play
|
||||
|
@ -150,7 +146,7 @@ int main(int argc, char** argv){
|
|||
#if DEBUG >= 4
|
||||
std::cerr << "Playing one keyframe" << std::endl;
|
||||
#endif
|
||||
bench = getNowMS();
|
||||
bench = Util::getMS();
|
||||
} break;
|
||||
case 'q':{ //quit-playing
|
||||
playing = 0;
|
||||
|
@ -162,15 +158,17 @@ int main(int argc, char** argv){
|
|||
}
|
||||
}
|
||||
if (playing != 0){
|
||||
now = getNowMS();
|
||||
if (playing > 0 || meta["video"]["keyms"].asInt() <= now-lastTime) {
|
||||
now = Util::getMS();
|
||||
source.seekNext();
|
||||
if (source.getJSON().isMember("keyframe")){
|
||||
if (playing == -1 && meta["video"]["keyms"].asInt() > now-lastTime) {
|
||||
Util::sleep(meta["video"]["keyms"].asInt()-(now-lastTime));
|
||||
}
|
||||
lastTime = now;
|
||||
if (playing > 0){--playing;}
|
||||
if (playing == 0){
|
||||
#if DEBUG >= 4
|
||||
std::cerr << "Sending pause_marker (" << (getNowMS() - bench) << "ms)" << std::endl;
|
||||
std::cerr << "Sending pause_marker (" << (Util::getMS() - bench) << "ms)" << std::endl;
|
||||
#endif
|
||||
pausemark["time"] = (long long int)now;
|
||||
pausemark.toPacked();
|
||||
|
@ -179,7 +177,7 @@ int main(int argc, char** argv){
|
|||
}
|
||||
}
|
||||
if (playing != 0){
|
||||
lasttime = time(0);
|
||||
lasttime = Util::epoch();
|
||||
//insert proper header for this type of data
|
||||
in_out.Send("DTPD");
|
||||
//insert the packet length
|
||||
|
@ -188,10 +186,7 @@ int main(int argc, char** argv){
|
|||
in_out.SendNow(source.getPacket());
|
||||
}
|
||||
}else{
|
||||
usleep((meta["video"]["keyms"].asInt()-(now-lastTime))*1000);
|
||||
}
|
||||
}else{
|
||||
usleep(10000);//sleep 10ms
|
||||
Util::sleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue