diff --git a/src/conn_http.cpp b/src/conn_http.cpp index becd7ea4..a83deb7a 100644 --- a/src/conn_http.cpp +++ b/src/conn_http.cpp @@ -91,14 +91,14 @@ namespace Connector_HTTP{ H.Clean(); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetBody("Unsupported Media Type

Unsupported Media Type

The server isn't quite sure what you wanted to receive from it."); - conn->Send(H.BuildResponse("415", "Unsupported Media Type")); + conn->SendNow(H.BuildResponse("415", "Unsupported Media Type")); } void Handle_Timeout(HTTP::Parser & H, Socket::Connection * conn){ H.Clean(); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetBody("Gateway timeout

Gateway timeout

Though the server understood your request and attempted to handle it, somehow handling it took longer than it should. Your request has been cancelled - please try again later."); - conn->Send(H.BuildResponse("504", "Gateway Timeout")); + conn->SendNow(H.BuildResponse("504", "Gateway Timeout")); } /// Handles internal requests. @@ -111,7 +111,7 @@ namespace Connector_HTTP{ H.SetHeader("Content-Type", "text/xml"); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); H.SetBody(""); - conn->Send(H.BuildResponse("200", "OK")); + conn->SendNow(H.BuildResponse("200", "OK")); return; }//crossdomain.xml @@ -173,7 +173,7 @@ namespace Connector_HTTP{ response.append("(\"" + streamname + "\"));\n"); } H.SetBody(response); - conn->Send(H.BuildResponse("200", "OK")); + conn->SendNow(H.BuildResponse("200", "OK")); return; }//embed code generator @@ -235,7 +235,7 @@ namespace Connector_HTTP{ return; } //forward the original request - connconn[uid]->conn->Send(request); + connconn[uid]->conn->SendNow(request); connconn[uid]->lastuse = 0; unsigned int timeout = 0; //wait for a response @@ -267,13 +267,13 @@ namespace Connector_HTTP{ //known length - simply re-send the request with added headers and continue H.SetHeader("X-UID", uid); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); - conn->Send(H.BuildResponse("200", "OK")); + conn->SendNow(H.BuildResponse("200", "OK")); conn->flush(); }else{ //unknown length H.SetHeader("X-UID", uid); H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); - conn->Send(H.BuildResponse("200", "OK")); + conn->SendNow(H.BuildResponse("200", "OK")); //switch out the connection for an empty one - it makes no sense to keep these globally Socket::Connection * myConn = connconn[uid]->conn; connconn[uid]->conn = new Socket::Connection(); @@ -282,9 +282,8 @@ namespace Connector_HTTP{ while (myConn->connected() && conn->connected()){ if (myConn->Received().size() || myConn->spool()){ //forward any and all incoming data directly without parsing - conn->Send(myConn->Received().get()); + conn->SendNow(myConn->Received().get()); myConn->Received().get().clear(); - conn->flush(); }else{ usleep(30000); } diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp index ff4587e8..6c8af80e 100644 --- a/src/conn_http_dynamic.cpp +++ b/src/conn_http_dynamic.cpp @@ -26,21 +26,29 @@ /// Holds everything unique to HTTP Dynamic Connector. namespace Connector_HTTP{ - std::string GenerateBootstrap(std::string & MovieId, JSON::Value & metadata){ + std::string GenerateBootstrap(std::string & MovieId, JSON::Value & metadata, int fragnum, int starttime){ MP4::AFRT afrt; - afrt.SetUpdate(false); + if (starttime == 0){ + afrt.SetUpdate(false); + }else{ + afrt.SetUpdate(true); + } afrt.SetTimeScale(1000); afrt.AddQualityEntry(""); if (!metadata.isMember("video") || !metadata["video"].isMember("keyms") || metadata["video"]["keyms"].asInt() == 0){ //metadata["lasttime"].asInt()? - afrt.AddFragmentRunEntry(1, 0, 2000); //FirstFragment, FirstFragmentTimestamp,Fragment Duration in milliseconds + afrt.AddFragmentRunEntry(fragnum, starttime, 2000); //FirstFragment, FirstFragmentTimestamp,Fragment Duration in milliseconds }else{ - afrt.AddFragmentRunEntry(1, 0, metadata["video"]["keyms"].asInt()); //FirstFragment, FirstFragmentTimestamp,Fragment Duration in milliseconds + afrt.AddFragmentRunEntry(fragnum, starttime, metadata["video"]["keyms"].asInt()); //FirstFragment, FirstFragmentTimestamp,Fragment Duration in milliseconds } afrt.WriteContent(); MP4::ASRT asrt; - asrt.SetUpdate(false); + if (starttime == 0){ + asrt.SetUpdate(false); + }else{ + asrt.SetUpdate(true); + } asrt.AddQualityEntry(""); /// \todo Actually use correct number of fragments. asrt.AddSegmentRunEntry(1, 20000);//1 Segment, 20000 Fragments @@ -58,7 +66,11 @@ namespace Connector_HTTP{ abst.SetLive(true); abst.SetMediaTime(0xFFFFFFFF);//metadata["lasttime"].asInt()? } - abst.SetUpdate(false); + if (starttime == 0){ + abst.SetUpdate(false); + }else{ + abst.SetUpdate(true); + } abst.SetTimeScale(1000); abst.SetSMPTE(0); abst.SetMovieIdentifier(MovieId); @@ -67,13 +79,11 @@ namespace Connector_HTTP{ abst.AddServerEntry(""); abst.AddQualityEntry(""); abst.WriteContent(); - - std::string Result; - Result.append((char*)abst.GetBoxedData(), (int)abst.GetBoxedDataSize()); - #if DEBUG >= 8 + + //#if DEBUG >= 8 std::cout << "Sending bootstrap:" << std::endl << abst.toPrettyString(0) << std::endl; - #endif - return Base64::encode(Result); + //#endif + return std::string((char*)abst.GetBoxedData(), (int)abst.GetBoxedDataSize()); } @@ -91,7 +101,7 @@ namespace Connector_HTTP{ "recorded\n" "streaming\n" "\n" - "" + GenerateBootstrap(MovieId, metadata) + "\n" + "" + Base64::encode(GenerateBootstrap(MovieId, metadata, 1, 0)) + "\n" "\n" "\n"; }else{ @@ -101,7 +111,7 @@ namespace Connector_HTTP{ "video/mp4\n" "live\n" "streaming\n" - "" + GenerateBootstrap(MovieId, metadata) + "\n" + "" + Base64::encode(GenerateBootstrap(MovieId, metadata, 1, 0)) + "\n" "\n" "\n"; } @@ -113,9 +123,10 @@ namespace Connector_HTTP{ /// Main function for Connector_HTTP_Dynamic int Connector_HTTP_Dynamic(Socket::Connection conn){ - std::stringstream FlashBuf; - int flashbuf_nonempty = 0; - FLV::Tag tmp;//temporary tag, for init data + std::deque FlashBuf; + int FlashBufSize = 0; + long long int FlashBufTime = 0; + FLV::Tag tmp;//temporary tag DTSC::Stream Strm;//Incoming stream buffer. HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender. @@ -126,7 +137,6 @@ namespace Connector_HTTP{ bool inited = false; Socket::Connection ss(-1); std::string streamname; - FLV::Tag tag;///< Temporary tag buffer. std::string recBuffer = ""; std::string Quality; @@ -155,7 +165,7 @@ namespace Connector_HTTP{ ss.close(); HTTP_S.Clean(); HTTP_S.SetBody("No such stream is available on the system. Please try again.\n"); - conn.Send(HTTP_S.BuildResponse("404", "Not found")); + conn.SendNow(HTTP_S.BuildResponse("404", "Not found")); ready4data = false; continue; } @@ -173,8 +183,7 @@ namespace Connector_HTTP{ #endif std::stringstream sstream; sstream << "f " << ReqFragment << "\no \n"; - ss.Send(sstream.str().c_str()); - ss.flush(); + ss.SendNow(sstream.str().c_str()); Flash_RequestPending++; }else{ streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1); @@ -185,7 +194,7 @@ namespace Connector_HTTP{ if (Strm.metadata.isMember("length")){receive_marks = true;} std::string manifest = BuildManifest(streamname, Strm.metadata); HTTP_S.SetBody(manifest); - conn.Send(HTTP_S.BuildResponse("200", "OK")); + conn.SendNow(HTTP_S.BuildResponse("200", "OK")); #if DEBUG >= 3 printf("Sent manifest\n"); #endif @@ -198,7 +207,11 @@ namespace Connector_HTTP{ HTTP_R.Clean(); //clean for any possible next requests } }else{ - usleep(10000);//sleep 10ms + if (Flash_RequestPending){ + usleep(1000);//sleep 1ms + }else{ + usleep(10000);//sleep 10ms + } } if (ready4data){ if (!inited){ @@ -211,7 +224,7 @@ namespace Connector_HTTP{ ss.close(); HTTP_S.Clean(); HTTP_S.SetBody("No such stream is available on the system. Please try again.\n"); - conn.Send(HTTP_S.BuildResponse("404", "Not found")); + conn.SendNow(HTTP_S.BuildResponse("404", "Not found")); ready4data = false; continue; } @@ -225,10 +238,10 @@ namespace Connector_HTTP{ if (now != lastStats){ lastStats = now; ss.Send("S "); - ss.Send(conn.getStats("HTTP_Dynamic").c_str()); + ss.SendNow(conn.getStats("HTTP_Dynamic").c_str()); } - if (ss.spool() || ss.Received().size()){ - if (Strm.parsePacket(ss.Received())){ + if (ss.spool()){ + while (Strm.parsePacket(ss.Received())){ if (Strm.getPacket(0).isMember("time")){ if (!Strm.metadata.isMember("firsttime")){ Strm.metadata["firsttime"] = Strm.getPacket(0)["time"]; @@ -239,9 +252,6 @@ namespace Connector_HTTP{ } Strm.metadata["lasttime"] = Strm.getPacket(0)["time"]; } - if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){ - tag.DTSCLoader(Strm); - } if (pending_manifest){ HTTP_S.Clean(); HTTP_S.SetHeader("Content-Type","text/xml"); @@ -249,7 +259,7 @@ namespace Connector_HTTP{ if (Strm.metadata.isMember("length")){receive_marks = true;} std::string manifest = BuildManifest(streamname, Strm.metadata); HTTP_S.SetBody(manifest); - conn.Send(HTTP_S.BuildResponse("200", "OK")); + conn.SendNow(HTTP_S.BuildResponse("200", "OK")); #if DEBUG >= 3 printf("Sent manifest\n"); #endif @@ -257,64 +267,88 @@ namespace Connector_HTTP{ } if (!receive_marks && Strm.metadata.isMember("length")){receive_marks = true;} if ((Strm.getPacket(0).isMember("keyframe") && !receive_marks) || Strm.lastType() == DTSC::PAUSEMARK){ - if (flashbuf_nonempty || Strm.lastType() == DTSC::PAUSEMARK){ - #if DEBUG >= 4 - fprintf(stderr, "Received a %s fragment of %i packets.\n", Strm.getPacket(0)["datatype"].asString().c_str(), flashbuf_nonempty); + #if DEBUG >= 4 + fprintf(stderr, "Received a %s fragment of %i bytes.\n", Strm.getPacket(0)["datatype"].asString().c_str(), FlashBufSize); + #endif + if (Flash_RequestPending > 0 && FlashBufSize){ + #if DEBUG >= 3 + fprintf(stderr, "Sending a fragment..."); #endif - if (Flash_RequestPending > 0){ - HTTP_S.Clean(); - HTTP_S.SetHeader("Content-Type","video/mp4"); - HTTP_S.SetBody(MP4::mdatFold(FlashBuf.str())); - conn.Send(HTTP_S.BuildResponse("200", "OK")); - Flash_RequestPending--; - #if DEBUG >= 3 - fprintf(stderr, "Sending a fragment..."); - #endif - conn.flush(); - #if DEBUG >= 3 - fprintf(stderr, "Done\n"); - #endif + static std::string btstrp; + btstrp = GenerateBootstrap(streamname, Strm.metadata, ReqFragment, FlashBufTime); + HTTP_S.Clean(); + HTTP_S.SetHeader("Content-Type", "video/mp4"); + HTTP_S.SetBody(""); + HTTP_S.SetHeader("Content-Length", FlashBufSize+32+33+btstrp.size()); + conn.SendNow(HTTP_S.BuildResponse("200", "OK")); + conn.SendNow("\x00\x00\x00\x21" "afra\x00\x00\x00\x00\x00\x00\x00\x03\xE8\x00\x00\x00\x01", 21); + unsigned long tmptime = htonl(FlashBufTime << 32); + conn.SendNow((char*)&tmptime, 4); + tmptime = htonl(FlashBufTime & 0xFFFFFFFF); + conn.SendNow((char*)&tmptime, 4); + tmptime = htonl(65); + conn.SendNow((char*)&tmptime, 4); + + conn.SendNow(btstrp); + + conn.SendNow("\x00\x00\x00\x18moof\x00\x00\x00\x10mfhd\x00\x00\x00\x00", 20); + unsigned long fragno = htonl(ReqFragment); + conn.SendNow((char*)&fragno, 4); + unsigned long size = htonl(FlashBufSize+8); + conn.SendNow((char*)&size, 4); + conn.SendNow("mdat", 4); + while (FlashBuf.size() > 0){ + conn.SendNow(FlashBuf.front()); + FlashBuf.pop_front(); } + Flash_RequestPending--; + #if DEBUG >= 3 + fprintf(stderr, "Done\n"); + #endif } - FlashBuf.str(""); - flashbuf_nonempty = 0; - //fill buffer with init data, if needed. - if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){ - tmp.DTSCAudioInit(Strm); - FlashBuf.write(tmp.data, tmp.len); - } - if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){ - tmp.DTSCVideoInit(Strm); - FlashBuf.write(tmp.data, tmp.len); - } + FlashBuf.clear(); + FlashBufSize = 0; } if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){ - ++flashbuf_nonempty; - FlashBuf.write(tag.data, tag.len); - } - }else{ - if (pending_manifest && !Strm.metadata.isNull()){ - HTTP_S.Clean(); - HTTP_S.SetHeader("Content-Type","text/xml"); - HTTP_S.SetHeader("Cache-Control","no-cache"); - if (Strm.metadata.isMember("length")){receive_marks = true;} - std::string manifest = BuildManifest(streamname, Strm.metadata); - HTTP_S.SetBody(manifest); - conn.Send(HTTP_S.BuildResponse("200", "OK")); - #if DEBUG >= 3 - printf("Sent manifest\n"); - #endif - pending_manifest = false; + if (FlashBufSize == 0){ + //fill buffer with init data, if needed. + if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){ + tmp.DTSCAudioInit(Strm); + FlashBuf.push_back(std::string(tmp.data, tmp.len)); + FlashBufSize += tmp.len; + } + if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){ + tmp.DTSCVideoInit(Strm); + FlashBuf.push_back(std::string(tmp.data, tmp.len)); + FlashBufSize += tmp.len; + } + FlashBufTime = Strm.getPacket(0)["time"].asInt(); + } + tmp.DTSCLoader(Strm); + FlashBuf.push_back(std::string(tmp.data, tmp.len)); + FlashBufSize += tmp.len; } } + if (pending_manifest && !Strm.metadata.isNull()){ + HTTP_S.Clean(); + HTTP_S.SetHeader("Content-Type","text/xml"); + HTTP_S.SetHeader("Cache-Control","no-cache"); + if (Strm.metadata.isMember("length")){receive_marks = true;} + std::string manifest = BuildManifest(streamname, Strm.metadata); + HTTP_S.SetBody(manifest); + conn.SendNow(HTTP_S.BuildResponse("200", "OK")); + #if DEBUG >= 3 + printf("Sent manifest\n"); + #endif + pending_manifest = false; + } } if (!ss.connected()){break;} } } conn.close(); ss.Send("S "); - ss.Send(conn.getStats("HTTP_Dynamic").c_str()); - ss.flush(); + ss.SendNow(conn.getStats("HTTP_Dynamic").c_str()); ss.close(); #if DEBUG >= 1 if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());} diff --git a/src/conn_http_progressive.cpp b/src/conn_http_progressive.cpp index 83227aab..ab4adbd8 100644 --- a/src/conn_http_progressive.cpp +++ b/src/conn_http_progressive.cpp @@ -55,7 +55,7 @@ namespace Connector_HTTP{ HTTP_R.Clean(); //clean for any possible next requests } }else{ - usleep(10000);//sleep 10ms + usleep(1000);//sleep 1ms } if (ready4data){ if (!inited){ @@ -68,58 +68,57 @@ namespace Connector_HTTP{ ss.close(); HTTP_S.Clean(); HTTP_S.SetBody("No such stream is available on the system. Please try again.\n"); - conn.Send(HTTP_S.BuildResponse("404", "Not found")); + conn.SendNow(HTTP_S.BuildResponse("404", "Not found")); ready4data = false; continue; } if (seek_pos){ std::stringstream cmd; cmd << "s " << seek_pos << "\n"; - ss.Send(cmd.str().c_str()); + ss.SendNow(cmd.str().c_str()); } #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif - ss.Send("p\n"); - ss.flush(); + ss.SendNow("p\n"); inited = true; } unsigned int now = time(0); if (now != lastStats){ lastStats = now; ss.Send("S "); - ss.Send(conn.getStats("HTTP_Progressive").c_str()); + ss.SendNow(conn.getStats("HTTP_Progressive").c_str()); } - if (ss.spool() || ss.Received().size()){ - if (Strm.parsePacket(ss.Received())){ - tag.DTSCLoader(Strm); + if (ss.spool()){ + while (Strm.parsePacket(ss.Received())){ if (!progressive_has_sent_header){ HTTP_S.Clean();//make sure no parts of old requests are left in any buffers HTTP_S.SetHeader("Content-Type", "video/x-flv");//Send the correct content-type for FLV files //HTTP_S.SetHeader("Transfer-Encoding", "chunked"); HTTP_S.protocol = "HTTP/1.0"; - conn.Send(HTTP_S.BuildResponse("200", "OK"));//no SetBody = unknown length - this is intentional, we will stream the entire file - conn.Send(FLV::Header, 13);//write FLV header + conn.SendNow(HTTP_S.BuildResponse("200", "OK"));//no SetBody = unknown length - this is intentional, we will stream the entire file + conn.SendNow(FLV::Header, 13);//write FLV header static FLV::Tag tmp; //write metadata tmp.DTSCMetaInit(Strm); - conn.Send(tmp.data, tmp.len); + conn.SendNow(tmp.data, tmp.len); //write video init data, if needed if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){ tmp.DTSCVideoInit(Strm); - conn.Send(tmp.data, tmp.len); + conn.SendNow(tmp.data, tmp.len); } //write audio init data, if needed if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){ tmp.DTSCAudioInit(Strm); - conn.Send(tmp.data, tmp.len); + conn.SendNow(tmp.data, tmp.len); } progressive_has_sent_header = true; #if DEBUG >= 1 fprintf(stderr, "Sent progressive FLV header\n"); #endif } - conn.Send(tag.data, tag.len);//write the tag contents + tag.DTSCLoader(Strm); + conn.SendNow(tag.data, tag.len);//write the tag contents } } if (!ss.connected()){break;} @@ -127,8 +126,7 @@ namespace Connector_HTTP{ } conn.close(); ss.Send("S "); - ss.Send(conn.getStats("HTTP_Dynamic").c_str()); - ss.flush(); + ss.SendNow(conn.getStats("HTTP_Dynamic").c_str()); ss.close(); #if DEBUG >= 1 if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());} diff --git a/src/conn_raw.cpp b/src/conn_raw.cpp index f3c416a4..2c8d8242 100644 --- a/src/conn_raw.cpp +++ b/src/conn_raw.cpp @@ -38,15 +38,12 @@ int main(int argc, char ** argv) { lastStats = now; std::stringstream st; st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; - std::string tmp = st.str(); - S.Send(tmp); + S.SendNow(st.str().c_str()); } } std::stringstream st; st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; - std::string tmp = st.str(); - S.Send(tmp); - S.flush(); + S.SendNow(st.str().c_str()); S.close(); return 0; } diff --git a/src/conn_rtmp.cpp b/src/conn_rtmp.cpp index 355a1a63..441a060d 100644 --- a/src/conn_rtmp.cpp +++ b/src/conn_rtmp.cpp @@ -57,7 +57,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ RTMPStream::rec_cnt += 1537; if (RTMPStream::doHandshake()){ - Socket.Send(RTMPStream::handshake_out); + Socket.SendNow(RTMPStream::handshake_out); while (!Socket.Received().available(1536) && Socket.connected()){Socket.spool(); usleep(5000);} Socket.Received().remove(1536); RTMPStream::rec_cnt += 1536; @@ -74,10 +74,12 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ unsigned int lastStats = 0; while (Socket.connected()){ - if (Socket.Received().size() || Socket.spool()){ - parseChunk(Socket.Received().get()); + if (Socket.spool() || Socket.Received().size()){ + while (Socket.Received().size()){ + parseChunk(Socket.Received().get()); + } }else{ - usleep(10000);//sleep 10ms to prevent high CPU usage + usleep(1000);//sleep 1ms to prevent high CPU usage } if (ready4data){ if (!inited){ @@ -94,7 +96,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif - SS.Send("p\n");SS.flush(); + SS.SendNow("p\n"); inited = true; } if (inited && !nostats){ @@ -102,11 +104,11 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ if (now != lastStats){ lastStats = now; SS.Send("S "); - SS.Send(Socket.getStats("RTMP").c_str()); + SS.SendNow(Socket.getStats("RTMP").c_str()); } } - if (SS.spool() || SS.Received().size()){ - if (Strm.parsePacket(SS.Received())){ + if (SS.spool()){ + while (Strm.parsePacket(SS.Received())){ if (play_trans != -1){ //send a status reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); @@ -148,20 +150,20 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ //sent init data if needed if (!stream_inited){ init_tag.DTSCMetaInit(Strm); - Socket.Send(RTMPStream::SendMedia(init_tag)); + Socket.SendNow(RTMPStream::SendMedia(init_tag)); if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){ init_tag.DTSCAudioInit(Strm); - Socket.Send(RTMPStream::SendMedia(init_tag)); + Socket.SendNow(RTMPStream::SendMedia(init_tag)); } if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){ init_tag.DTSCVideoInit(Strm); - Socket.Send(RTMPStream::SendMedia(init_tag)); + Socket.SendNow(RTMPStream::SendMedia(init_tag)); } stream_inited = true; } //sent a tag tag.DTSCLoader(Strm); - Socket.Send(RTMPStream::SendMedia(tag)); + Socket.SendNow(RTMPStream::SendMedia(tag)); #if DEBUG >= 8 fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), tag.tagTime(), tag.tagType().c_str()); #endif @@ -171,8 +173,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ } Socket.close(); SS.Send("S "); - SS.Send(Socket.getStats("RTMP").c_str()); - SS.flush(); + SS.SendNow(Socket.getStats("RTMP").c_str()); SS.close(); #if DEBUG >= 1 if (FLV::Parse_Error){fprintf(stderr, "FLV Parse Error: %s\n", FLV::Error_Str.c_str());} @@ -281,15 +282,15 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ counter++; if (counter > 8){ sending = true; - SS.Send(meta_out.toNetPacked()); - SS.Send(prebuffer.str().c_str());//write buffer + SS.SendNow(meta_out.toNetPacked()); + SS.SendNow(prebuffer.str().c_str());//write buffer prebuffer.str("");//clear buffer SS.Send(pack_out.toNetPacked()); }else{ prebuffer << pack_out.toNetPacked(); } }else{ - SS.Send(pack_out.toNetPacked()); + SS.SendNow(pack_out.toNetPacked()); } } }else{ @@ -357,9 +358,9 @@ void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int st std::cerr << amfreply.Print() << std::endl; #endif if (messagetype == 17){ - Socket.Send(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack())); + Socket.SendNow(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack())); }else{ - Socket.Send(RTMPStream::SendChunk(3, messagetype, stream_id, amfreply.Pack())); + Socket.SendNow(RTMPStream::SendChunk(3, messagetype, stream_id, amfreply.Pack())); } }//sendCommand diff --git a/src/player.cpp b/src/player.cpp index 0e0fd479..263d28b3 100644 --- a/src/player.cpp +++ b/src/player.cpp @@ -70,7 +70,7 @@ int main(int argc, char** argv){ pausemark["time"] = (long long int)0; Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); - int lasttime = time(0); + int lasttime = time(0);//time last packet was sent //send the header { @@ -81,95 +81,100 @@ int main(int argc, char** argv){ } JSON::Value meta = JSON::fromDTMI(meta_str); + if (meta["video"]["keyms"].asInt() < 11){ + meta["video"]["keyms"] = (long long int)1000; + } JSON::Value last_pack; bool meta_sent = false; - long long now, timeDiff = 0, lastTime = 0; + long long now, lastTime = 0;//for timing of sending packets + long long bench = 0;//for benchmarking Stats sts; while (in_out.connected() && std::cin.good() && std::cout.good() && (time(0) - lasttime < 60)){ - if (in_out.Received().size() || in_out.spool()){ - //delete anything that doesn't end with a newline - if (!in_out.Received().get().empty() && *(in_out.Received().get().rbegin()) != '\n'){ - in_out.Received().get().clear(); - continue; - } - in_out.Received().get().resize(in_out.Received().get().size() - 1); - if (!in_out.Received().get().empty()){ - switch (in_out.Received().get()[0]){ - case 'P':{ //Push - #if DEBUG >= 4 - std::cerr << "Received push - ignoring (" << in_out.Received().get() << ")" << std::endl; - #endif - in_out.close();//pushing to VoD makes no sense - } break; - case 'S':{ //Stats - if (!StatsSocket.connected()){ - StatsSocket = Socket::Connection("/tmp/mist/statistics", true); - } - if (StatsSocket.connected()){ - sts = Stats(in_out.Received().get().substr(2)); - JSON::Value json_sts; - json_sts["vod"]["down"] = (long long int)sts.down; - json_sts["vod"]["up"] = (long long int)sts.up; - json_sts["vod"]["time"] = (long long int)sts.conntime; - json_sts["vod"]["host"] = sts.host; - json_sts["vod"]["connector"] = sts.connector; - json_sts["vod"]["filename"] = conf.getString("filename"); - json_sts["vod"]["now"] = (long long int)time(0); - json_sts["vod"]["start"] = (long long int)(time(0) - sts.conntime); - if (!meta_sent){ - json_sts["vod"]["meta"] = meta; - meta_sent = true; - } - StatsSocket.Send(json_sts.toString().c_str()); - StatsSocket.Send("\n\n"); - StatsSocket.flush(); - } - } break; - case 's':{ //second-seek - int ms = JSON::Value(in_out.Received().get().substr(2)).asInt(); - bool ret = source.seek_time(ms); - } break; - case 'f':{ //frame-seek - bool ret = source.seek_frame(JSON::Value(in_out.Received().get().substr(2)).asInt()); - } break; - case 'p':{ //play - playing = -1; - in_out.setBlocking(false); - } break; - case 'o':{ //once-play - if (playing <= 0){playing = 1;} - ++playing; - in_out.setBlocking(false); - } break; - case 'q':{ //quit-playing - playing = 0; - in_out.setBlocking(true); - } break; + if (in_out.spool()){ + while (in_out.Received().size()){ + //delete anything that doesn't end with a newline + if (*(in_out.Received().get().rbegin()) != '\n'){ + in_out.Received().get().clear(); + continue; + } + in_out.Received().get().resize(in_out.Received().get().size() - 1); + if (!in_out.Received().get().empty()){ + switch (in_out.Received().get()[0]){ + case 'P':{ //Push + #if DEBUG >= 4 + std::cerr << "Received push - ignoring (" << in_out.Received().get() << ")" << std::endl; + #endif + in_out.close();//pushing to VoD makes no sense + } break; + case 'S':{ //Stats + if (!StatsSocket.connected()){ + StatsSocket = Socket::Connection("/tmp/mist/statistics", true); + } + if (StatsSocket.connected()){ + sts = Stats(in_out.Received().get().substr(2)); + JSON::Value json_sts; + json_sts["vod"]["down"] = (long long int)sts.down; + json_sts["vod"]["up"] = (long long int)sts.up; + json_sts["vod"]["time"] = (long long int)sts.conntime; + json_sts["vod"]["host"] = sts.host; + json_sts["vod"]["connector"] = sts.connector; + json_sts["vod"]["filename"] = conf.getString("filename"); + json_sts["vod"]["now"] = (long long int)time(0); + json_sts["vod"]["start"] = (long long int)(time(0) - sts.conntime); + if (!meta_sent){ + json_sts["vod"]["meta"] = meta; + meta_sent = true; + } + StatsSocket.Send(json_sts.toString().c_str()); + StatsSocket.Send("\n\n"); + StatsSocket.flush(); + } + } break; + case 's':{ //second-seek + int ms = JSON::Value(in_out.Received().get().substr(2)).asInt(); + bool ret = source.seek_time(ms); + } break; + case 'f':{ //frame-seek + bool ret = source.seek_frame(JSON::Value(in_out.Received().get().substr(2)).asInt()); + } break; + case 'p':{ //play + playing = -1; + in_out.setBlocking(false); + } break; + case 'o':{ //once-play + if (playing <= 0){playing = 1;} + ++playing; + in_out.setBlocking(false); + #if DEBUG >= 4 + std::cerr << "Playing one keyframe" << std::endl; + #endif + bench = getNowMS(); + } break; + case 'q':{ //quit-playing + playing = 0; + in_out.setBlocking(true); + } break; + } + in_out.Received().get().clear(); } - in_out.Received().get().clear(); } } if (playing != 0){ now = getNowMS(); - /// \todo This makes no sense. We're timing for packets here, but sending a whole keyframe. Fix. ASAP. - if (playing > 0 || now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) { + if (playing > 0 || meta["video"]["keyms"].asInt() <= now-lastTime) { source.seekNext(); - lastTime = source.getJSON()["time"].asInt(); - if ((now - timeDiff - lastTime) > 5000 || (now - timeDiff - lastTime < -5000)){ - timeDiff = now - lastTime; - } if (source.getJSON().isMember("keyframe")){ + lastTime = now; if (playing > 0){--playing;} if (playing == 0){ #if DEBUG >= 4 - std::cerr << "Sending pause_marker" << std::endl; + std::cerr << "Sending pause_marker (" << (getNowMS() - bench) << "ms)" << std::endl; #endif pausemark["time"] = (long long int)now; pausemark.toPacked(); - in_out.Send(pausemark.toNetPacked()); - in_out.flush(); + in_out.SendNow(pausemark.toNetPacked()); in_out.setBlocking(true); } } @@ -180,11 +185,10 @@ int main(int argc, char** argv){ //insert the packet length unsigned int size = htonl(source.getPacket().size()); in_out.Send((char*)&size, 4); - in_out.Send(source.getPacket()); - in_out.flush(); + in_out.SendNow(source.getPacket()); } } else { - usleep(std::min(10000LL, lastTime - (now - timeDiff)) * 1000); + usleep((meta["video"]["keyms"].asInt()-(now-lastTime))*1000); } }else{ usleep(10000);//sleep 10ms