diff --git a/src/buffer.cpp b/src/buffer.cpp index f651840a..a79918a7 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -33,6 +33,7 @@ namespace Buffer{ void handleStats(void * empty){ if (empty != 0){return;} + std::string double_newline = "\n\n"; Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); while (buffer_running){ usleep(1000000); //sleep one second @@ -40,7 +41,8 @@ namespace Buffer{ StatsSocket = Socket::Connection("/tmp/mist/statistics", true); } if (StatsSocket.connected()){ - StatsSocket.Send(Stream::get()->getStats()+"\n\n"); + StatsSocket.Send(Stream::get()->getStats()); + StatsSocket.Send(double_newline); StatsSocket.flush(); } } @@ -140,7 +142,7 @@ namespace Buffer{ inBuffer.append(charBuffer, charCount); } }else{ - usleep(std::min(14999LL, lastTime - (now - timeDiff)) * 1000); + usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000); } } buffer_running = false; diff --git a/src/buffer_stream.cpp b/src/buffer_stream.cpp index ebbbe6f9..1956a28b 100644 --- a/src/buffer_stream.cpp +++ b/src/buffer_stream.cpp @@ -45,7 +45,8 @@ Buffer::Stream::~Stream(){ } /// Calculate and return the current statistics in JSON format. -std::string Buffer::Stream::getStats(){ +std::string & Buffer::Stream::getStats(){ + static std::string ret; unsigned int now = time(0); unsigned int tot_up = 0, tot_down = 0, tot_count = 0; stats_mutex.lock(); @@ -64,7 +65,7 @@ std::string Buffer::Stream::getStats(){ Storage["meta"] = Strm->metadata; if (Storage["meta"].isMember("audio")){Storage["meta"]["audio"].removeMember("init");} if (Storage["meta"].isMember("video")){Storage["meta"]["video"].removeMember("init");} - std::string ret = Storage.toString(); + ret = Storage.toString(); Storage["log"].null(); stats_mutex.unlock(); return ret; diff --git a/src/buffer_stream.h b/src/buffer_stream.h index a4d340e2..81784d39 100644 --- a/src/buffer_stream.h +++ b/src/buffer_stream.h @@ -14,7 +14,7 @@ namespace Buffer{ /// Get a reference to this Stream object. static Stream * get(); /// Get the current statistics in JSON format. - std::string getStats(); + std::string & getStats(); /// Get a new DTSC::Ring object for a user. DTSC::Ring * getRing(); /// Drop a DTSC::Ring object. diff --git a/src/conn_http.cpp b/src/conn_http.cpp index 09af0847..edcec046 100644 --- a/src/conn_http.cpp +++ b/src/conn_http.cpp @@ -103,7 +103,7 @@ namespace Connector_HTTP{ /// Handles internal requests. void Handle_Internal(HTTP::Parser & H, Socket::Connection * conn){ - std::string url = H.url; + std::string url = H.getUrl(); if (url == "/crossdomain.xml"){ H.Clean(); @@ -114,7 +114,7 @@ namespace Connector_HTTP{ return; }//crossdomain.xml - if ((url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js") || (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(H.url.length() - 3, 3) == ".js")){ + if ((url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js") || (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js")){ std::string streamname; if (url.substr(0, 6) == "/info_"){ streamname = url.substr(6, url.length() - 9); @@ -293,24 +293,25 @@ namespace Connector_HTTP{ /// - dynamic (request fed from http_dynamic connector) /// - progressive (request fed from http_progressive connector) std::string getHTTPType(HTTP::Parser & H){ - if ((H.url.find("f4m") != std::string::npos) || ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos))){ - std::string streamname = H.url.substr(1,H.url.find("/",1)-1); + std::string url = H.getUrl(); + if ((url.find("f4m") != std::string::npos) || ((url.find("Seg") != std::string::npos) && (url.find("Frag") != std::string::npos))){ + std::string streamname = url.substr(1,url.find("/",1)-1); Util::Stream::sanitizeName(streamname); H.SetVar("stream", streamname); return "dynamic"; } - if (H.url.length() > 4){ - std::string ext = H.url.substr(H.url.length() - 4, 4); + if (url.length() > 4){ + std::string ext = url.substr(url.length() - 4, 4); if (ext == ".flv" || ext == ".mp3"){ - std::string streamname = H.url.substr(1,H.url.length() - 5); + std::string streamname = url.substr(1,url.length() - 5); Util::Stream::sanitizeName(streamname); H.SetVar("stream", streamname); return "progressive"; } } - if (H.url == "/crossdomain.xml"){return "internal";} - if (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";} - if (H.url.length() > 9 && H.url.substr(0, 6) == "/info_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";} + if (url == "/crossdomain.xml"){return "internal";} + if (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js"){return "internal";} + if (url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js"){return "internal";} return "none"; } @@ -324,7 +325,7 @@ namespace Connector_HTTP{ if (Client.Read(conn->Received())){ std::string handler = getHTTPType(Client); #if DEBUG >= 4 - std::cout << "Received request: " << Client.url << " => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl; + std::cout << "Received request: " << Client.getUrl() << " => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl; #endif if (handler == "none" || handler == "internal"){ if (handler == "internal"){ diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp index 85ce94fa..d74b6adc 100644 --- a/src/conn_http_dynamic.cpp +++ b/src/conn_http_dynamic.cpp @@ -113,6 +113,7 @@ namespace Connector_HTTP{ /// Main function for Connector_HTTP_Dynamic int Connector_HTTP_Dynamic(Socket::Connection conn){ std::string FlashBuf; + int flashbuf_nonempty = 0; FLV::Tag tmp;//temporary tag, for init data std::queue Flash_FragBuffer;//Fragment buffer @@ -139,11 +140,27 @@ namespace Connector_HTTP{ if (conn.spool()){ if (HTTP_R.Read(conn.Received())){ #if DEBUG >= 4 - std::cout << "Received request: " << HTTP_R.url << std::endl; + std::cout << "Received request: " << HTTP_R.getUrl() << std::endl; #endif conn.setHost(HTTP_R.GetHeader("X-Origin")); if (HTTP_R.url.find("f4m") == std::string::npos){ streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1); + if (!ss){ + ss = Util::Stream::getStream(streamname); + if (!ss.connected()){ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to server!\n"); + #endif + ss.close(); + HTTP_S.Clean(); + HTTP_S.SetBody("No such stream is available on the system. Please try again.\n"); + conn.Send(HTTP_S.BuildResponse("404", "Not found")); + ready4data = false; + continue; + } + ss.setBlocking(false); + inited = true; + } Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 ); Quality = Quality.substr(0, Quality.find("Seg")); temp = HTTP_R.url.find("Seg") + 3; @@ -153,12 +170,27 @@ namespace Connector_HTTP{ #if DEBUG >= 4 printf( "Quality: %s, Seg %d Frag %d\n", Quality.c_str(), Segment, ReqFragment); #endif - ss.Send("f " + JSON::Value((long long int)ReqFragment) + "\no \n"); + std::stringstream sstream; + sstream << "f " << ReqFragment << "\no \n"; + ss.Send(sstream.str().c_str()); ss.flush(); Flash_RequestPending++; }else{ streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1); - pending_manifest = true; + if (!Strm.metadata.isNull()){ + HTTP_S.Clean(); + HTTP_S.SetHeader("Content-Type","text/xml"); + HTTP_S.SetHeader("Cache-Control","no-cache"); + std::string manifest = BuildManifest(streamname, Strm.metadata); + HTTP_S.SetBody(manifest); + conn.Send(HTTP_S.BuildResponse("200", "OK")); + #if DEBUG >= 3 + printf("Sent manifest\n"); + #endif + pending_manifest = false; + }else{ + pending_manifest = true; + } } ready4data = true; HTTP_R.Clean(); //clean for any possible next requests @@ -183,6 +215,7 @@ namespace Connector_HTTP{ ready4data = false; continue; } + ss.setBlocking(false); #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif @@ -202,7 +235,8 @@ namespace Connector_HTTP{ unsigned int now = time(0); if (now != lastStats){ lastStats = now; - ss.Send("S "+conn.getStats("HTTP_Dynamic")); + ss.Send("S "); + ss.Send(conn.getStats("HTTP_Dynamic").c_str()); } if (ss.spool() || ss.Received() != ""){ if (Strm.parsePacket(ss.Received())){ @@ -216,7 +250,9 @@ namespace Connector_HTTP{ } Strm.metadata["lasttime"] = Strm.getPacket(0)["time"]; } - tag.DTSCLoader(Strm); + if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){ + tag.DTSCLoader(Strm); + } if (pending_manifest){ HTTP_S.Clean(); HTTP_S.SetHeader("Content-Type","text/xml"); @@ -229,17 +265,18 @@ namespace Connector_HTTP{ #endif pending_manifest = false; } - if (Strm.getPacket(0).isMember("keyframe")){ - if (FlashBuf != ""){ + if (Strm.getPacket(0).isMember("keyframe") || Strm.getPacket(0)["datatype"].asString() == "pause_marker"){ + if (flashbuf_nonempty){ Flash_FragBuffer.push(FlashBuf); while (Flash_FragBuffer.size() > 2){ Flash_FragBuffer.pop(); } #if DEBUG >= 4 - fprintf(stderr, "Received a fragment. Now %i in buffer.\n", (int)Flash_FragBuffer.size()); + fprintf(stderr, "Received a %s fragment of %i packets. Now %i in buffer.\n", Strm.getPacket(0)["datatype"].asString().c_str(), flashbuf_nonempty, (int)Flash_FragBuffer.size()); #endif } FlashBuf.clear(); + flashbuf_nonempty = 0; //fill buffer with init data, if needed. if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){ tmp.DTSCAudioInit(Strm); @@ -250,14 +287,31 @@ namespace Connector_HTTP{ FlashBuf.append(tmp.data, tmp.len); } } - FlashBuf.append(tag.data, tag.len); + if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){ + ++flashbuf_nonempty; + FlashBuf.append(tag.data, tag.len); + } + }else{ + if (pending_manifest && !Strm.metadata.isNull()){ + HTTP_S.Clean(); + HTTP_S.SetHeader("Content-Type","text/xml"); + HTTP_S.SetHeader("Cache-Control","no-cache"); + std::string manifest = BuildManifest(streamname, Strm.metadata); + HTTP_S.SetBody(manifest); + conn.Send(HTTP_S.BuildResponse("200", "OK")); + #if DEBUG >= 3 + printf("Sent manifest\n"); + #endif + pending_manifest = false; + } } } if (!ss.connected()){break;} } } conn.close(); - ss.Send("S "+conn.getStats("HTTP_Dynamic")); + ss.Send("S "); + ss.Send(conn.getStats("HTTP_Dynamic").c_str()); ss.flush(); ss.close(); #if DEBUG >= 1 diff --git a/src/conn_http_progressive.cpp b/src/conn_http_progressive.cpp index 81f7e955..c626a471 100644 --- a/src/conn_http_progressive.cpp +++ b/src/conn_http_progressive.cpp @@ -35,7 +35,7 @@ namespace Connector_HTTP{ FLV::Tag tag;///< Temporary tag buffer. unsigned int lastStats = 0; - unsigned int seek_pos = 0;//seek position in milliseconds + unsigned int seek_pos = 0;//seek position in ms conn.setBlocking(false);//do not block on conn.spool() when no data is available while (conn.connected()){ @@ -43,14 +43,14 @@ namespace Connector_HTTP{ if (conn.spool()){ if (HTTP_R.Read(conn.Received())){ #if DEBUG >= 4 - std::cout << "Received request: " << HTTP_R.url << std::endl; + std::cout << "Received request: " << HTTP_R.getUrl() << std::endl; #endif conn.setHost(HTTP_R.GetHeader("X-Origin")); //we assume the URL is the stream name with a 3 letter extension - streamname = HTTP_R.url; + streamname = HTTP_R.getUrl().substr(1); size_t extDot = streamname.rfind('.'); if (extDot != std::string::npos){streamname.resize(extDot);};//strip the extension - seek_pos = 1000 * atof(HTTP_R.GetVar("start").c_str());//seconds to ms + seek_pos = atoi(HTTP_R.GetVar("start").c_str()) * 1000;//seconds to ms ready4data = true; HTTP_R.Clean(); //clean for any possible next requests }else{ @@ -65,7 +65,7 @@ namespace Connector_HTTP{ ss = Util::Stream::getStream(streamname); if (!ss.connected()){ #if DEBUG >= 1 - fprintf(stderr, "Could not connect to server!\n"); + fprintf(stderr, "Could not connect to server for %s!\n", streamname.c_str()); #endif ss.close(); HTTP_S.Clean(); @@ -77,7 +77,7 @@ namespace Connector_HTTP{ if (seek_pos){ std::stringstream cmd; cmd << "s " << seek_pos << "\n"; - ss.Send(cmd.str()); + ss.Send(cmd.str().c_str()); } #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); @@ -89,7 +89,8 @@ namespace Connector_HTTP{ unsigned int now = time(0); if (now != lastStats){ lastStats = now; - ss.Send("S "+conn.getStats("HTTP_Progressive")); + ss.Send("S "); + ss.Send(conn.getStats("HTTP_Progressive").c_str()); } if (ss.spool() || ss.Received() != ""){ if (Strm.parsePacket(ss.Received())){ @@ -100,34 +101,35 @@ namespace Connector_HTTP{ //HTTP_S.SetHeader("Transfer-Encoding", "chunked"); HTTP_S.protocol = "HTTP/1.0"; conn.Send(HTTP_S.BuildResponse("200", "OK"));//no SetBody = unknown length - this is intentional, we will stream the entire file - conn.Send(std::string(FLV::Header, 13));//write FLV header + conn.Send(FLV::Header, 13);//write FLV header static FLV::Tag tmp; //write metadata tmp.DTSCMetaInit(Strm); - conn.Send(std::string(tmp.data, tmp.len)); + conn.Send(tmp.data, tmp.len); //write video init data, if needed if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){ tmp.DTSCVideoInit(Strm); - conn.Send(std::string(tmp.data, tmp.len)); + conn.Send(tmp.data, tmp.len); } //write audio init data, if needed if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){ tmp.DTSCAudioInit(Strm); - conn.Send(std::string(tmp.data, tmp.len)); + conn.Send(tmp.data, tmp.len); } progressive_has_sent_header = true; #if DEBUG >= 1 fprintf(stderr, "Sent progressive FLV header\n"); #endif } - conn.Send(std::string(tag.data, tag.len));//write the tag contents + conn.Send(tag.data, tag.len);//write the tag contents } } if (!ss.connected()){break;} } } conn.close(); - ss.Send("S "+conn.getStats("HTTP_Dynamic")); + ss.Send("S "); + ss.Send(conn.getStats("HTTP_Dynamic").c_str()); ss.flush(); ss.close(); #if DEBUG >= 1 diff --git a/src/conn_raw.cpp b/src/conn_raw.cpp index 860af85c..f231ab8d 100644 --- a/src/conn_raw.cpp +++ b/src/conn_raw.cpp @@ -36,12 +36,14 @@ int main(int argc, char ** argv) { lastStats = now; std::stringstream st; st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; - S.Send(st.str()); + std::string tmp = st.str(); + S.Send(tmp); } } std::stringstream st; st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; - S.Send(st.str()); + std::string tmp = st.str(); + S.Send(tmp); S.flush(); S.close(); return 0; diff --git a/src/conn_rtmp.cpp b/src/conn_rtmp.cpp index e68c1a5b..4c363a87 100644 --- a/src/conn_rtmp.cpp +++ b/src/conn_rtmp.cpp @@ -32,6 +32,9 @@ namespace Connector_RTMP{ int play_streamid = -1; int play_msgtype = -1; + //generic state keeping + bool stream_inited = false;///true if init data for audio/video was sent + Socket::Connection Socket; ///< Socket connected to user Socket::Connection SS; ///< Socket connected to server std::string streamname; ///< Stream that will be opened @@ -48,7 +51,6 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ Socket.setBlocking(false); FLV::Tag tag, init_tag; DTSC::Stream Strm; - bool stream_inited = false;//true if init data for audio/video was sent while (Socket.Received().size() < 1537 && Socket.connected()){Socket.spool(); usleep(5000);} RTMPStream::handshake_in = Socket.Received().substr(0, 1537); @@ -101,7 +103,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ unsigned int now = time(0); if (now != lastStats){ lastStats = now; - SS.Send("S "+Socket.getStats("RTMP")); + SS.Send("S "); + SS.Send(Socket.getStats("RTMP").c_str()); } } if (SS.spool()){ @@ -167,7 +170,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ } } Socket.close(); - SS.Send("S "+Socket.getStats("RTMP")); + SS.Send("S "); + SS.Send(Socket.getStats("RTMP").c_str()); SS.flush(); SS.close(); #if DEBUG >= 1 @@ -278,7 +282,7 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ if (counter > 8){ sending = true; SS.Send(meta_out.toNetPacked()); - SS.Send(prebuffer.str());//write buffer + SS.Send(prebuffer.str().c_str());//write buffer prebuffer.str("");//clear buffer SS.Send(pack_out.toNetPacked()); }else{ @@ -451,7 +455,9 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int Socket.close();//disconnect user return; } - SS.Send("P "+Socket.getHost()+'\n'); + SS.Send("P "); + SS.Send(Socket.getHost().c_str()); + SS.Send("\n"); nostats = true; #if DEBUG >= 4 fprintf(stderr, "Connected to buffer, starting to send data...\n"); @@ -503,7 +509,9 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int play_msgtype = messagetype; play_streamid = stream_id; stream_inited = false; - SS.Send("seek " + JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString() + "\n"); + SS.Send("s "); + SS.Send(JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString().c_str()); + SS.Send("\n"); return; }//seek diff --git a/src/player.cpp b/src/player.cpp index 8e36d434..bbd19192 100644 --- a/src/player.cpp +++ b/src/player.cpp @@ -1,6 +1,10 @@ /// \file player.cpp /// Holds all code for the MistPlayer application used for VoD streams. +#if DEBUG >= 4 +#include //for std::cerr +#endif + #include //for fileno #include #include @@ -23,14 +27,17 @@ int main(int argc, char** argv){ int playing = 0; DTSC::File source = DTSC::File(conf.getString("filename")); - Socket::Connection in_out = Socket::Connection(fileno(stdin), fileno(stdout)); + Socket::Connection in_out = Socket::Connection(fileno(stdout), fileno(stdin)); std::string meta_str = source.getHeader(); - + JSON::Value pausemark; + pausemark["datatype"] = "pause_marker"; + pausemark["time"] = (long long int)0; + //send the header { in_out.Send("DTSC"); unsigned int size = htonl(meta_str.size()); - in_out.Send(std::string((char*)&size, (size_t)4)); + in_out.Send((char*)&size, 4); in_out.Send(meta_str); } @@ -40,60 +47,104 @@ int main(int argc, char** argv){ long long now, timeDiff = 0, lastTime = 0; while (in_out.connected()){ - if (in_out.spool() && in_out.Received().find('\n') != std::string::npos){ - std::string cmd = in_out.Received().substr(0, in_out.Received().find('\n')); - in_out.Received().erase(0, in_out.Received().find('\n')+1); - if (cmd != ""){ - switch (cmd[0]){ - case 'P':{ //Push - in_out.close();//pushing to VoD makes no sense - } break; - case 'S':{ //Stats - /// \todo Parse stats command properly. - /* Stats(cmd.substr(2)); */ - } break; - case 's':{ //second-seek - int second = JSON::Value(cmd.substr(2)).asInt(); - double keyms = meta["video"]["keyms"].asInt(); - if (keyms <= 0){keyms = 2000;} - source.seek_frame(second / (keyms / 1000.0)); - } break; - case 'f':{ //frame-seek - source.seek_frame(JSON::Value(cmd.substr(2)).asInt()); - } break; - case 'p':{ //play - playing = -1; - } break; - case 'o':{ //once-play - if (playing < 0){playing = 0;} - ++playing; - } break; - case 'q':{ //quit-playing - playing = 0; - } break; + if (in_out.spool()){ + while (in_out.Received().find('\n') != std::string::npos){ + std::string cmd = in_out.Received().substr(0, in_out.Received().find('\n')); + in_out.Received().erase(0, in_out.Received().find('\n')+1); + if (cmd != ""){ + switch (cmd[0]){ + case 'P':{ //Push + #if DEBUG >= 4 + std::cerr << "Received push - ignoring (" << cmd << ")" << std::endl; + #endif + in_out.close();//pushing to VoD makes no sense + } break; + case 'S':{ //Stats + #if DEBUG >= 4 + //std::cerr << "Received stats - ignoring (" << cmd << ")" << std::endl; + #endif + /// \todo Parse stats command properly. + /* Stats(cmd.substr(2)); */ + } break; + case 's':{ //second-seek + #if DEBUG >= 4 + std::cerr << "Received ms-seek (" << cmd << ")" << std::endl; + #endif + int ms = JSON::Value(cmd.substr(2)).asInt(); + bool ret = source.seek_time(ms); + #if DEBUG >= 4 + std::cerr << "Second-seek completed (time " << ms << "ms) " << ret << std::endl; + #endif + } break; + case 'f':{ //frame-seek + #if DEBUG >= 4 + std::cerr << "Received frame-seek (" << cmd << ")" << std::endl; + #endif + bool ret = source.seek_frame(JSON::Value(cmd.substr(2)).asInt()); + #if DEBUG >= 4 + std::cerr << "Frame-seek completed " << ret << std::endl; + #endif + } break; + case 'p':{ //play + #if DEBUG >= 4 + std::cerr << "Received play" << std::endl; + #endif + playing = -1; + in_out.setBlocking(false); + } break; + case 'o':{ //once-play + #if DEBUG >= 4 + std::cerr << "Received once-play" << std::endl; + #endif + if (playing <= 0){playing = 1;} + ++playing; + in_out.setBlocking(false); + } break; + case 'q':{ //quit-playing + #if DEBUG >= 4 + std::cerr << "Received quit-playing" << std::endl; + #endif + playing = 0; + in_out.setBlocking(true); + } break; + } } } } if (playing != 0){ now = getNowMS(); - if (now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) { - std::string packet = source.getPacket(); - last_pack = JSON::fromDTMI(packet); - lastTime = last_pack["time"].asInt(); - if ((now - timeDiff - lastTime) > 15000 || (now - timeDiff - lastTime < -15000)){ + if (playing > 0 || now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) { + source.seekNext(); + lastTime = source.getJSON()["time"].asInt(); + if ((now - timeDiff - lastTime) > 5000 || (now - timeDiff - lastTime < -5000)){ timeDiff = now - lastTime; } - //insert proper header for this type of data - in_out.Send("DTPD"); - //insert the packet length - unsigned int size = htonl(packet.size()); - in_out.Send(std::string((char*)&size, (size_t)4)); - in_out.Send(packet); + if (source.getJSON().isMember("keyframe")){ + if (playing > 0){--playing;} + if (playing == 0){ + #if DEBUG >= 4 + std::cerr << "Sending pause_marker" << std::endl; + #endif + pausemark["time"] = (long long int)now; + pausemark.toPacked(); + in_out.Send(pausemark.toNetPacked()); + in_out.flush(); + in_out.setBlocking(true); + } + } + if (playing != 0){ + //insert proper header for this type of data + in_out.Send("DTPD"); + //insert the packet length + unsigned int size = htonl(source.getPacket().size()); + in_out.Send((char*)&size, 4); + in_out.Send(source.getPacket()); + } } else { - usleep(std::min(14999LL, lastTime - (now - timeDiff)) * 1000); + usleep(std::min(10000LL, lastTime - (now - timeDiff)) * 1000); } - if (playing > 0){--playing;} } + usleep(10000);//sleep 10ms } return 0; }