Updated everything for new socket lib requirements - working VoD through HTTP progressive, HTTP dynamic is almost working and RTMP is severely broken altogether.
This commit is contained in:
parent
5165aae7e3
commit
ef412b62da
9 changed files with 215 additions and 94 deletions
|
@ -33,6 +33,7 @@ namespace Buffer{
|
||||||
|
|
||||||
void handleStats(void * empty){
|
void handleStats(void * empty){
|
||||||
if (empty != 0){return;}
|
if (empty != 0){return;}
|
||||||
|
std::string double_newline = "\n\n";
|
||||||
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
|
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
|
||||||
while (buffer_running){
|
while (buffer_running){
|
||||||
usleep(1000000); //sleep one second
|
usleep(1000000); //sleep one second
|
||||||
|
@ -40,7 +41,8 @@ namespace Buffer{
|
||||||
StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
|
StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
|
||||||
}
|
}
|
||||||
if (StatsSocket.connected()){
|
if (StatsSocket.connected()){
|
||||||
StatsSocket.Send(Stream::get()->getStats()+"\n\n");
|
StatsSocket.Send(Stream::get()->getStats());
|
||||||
|
StatsSocket.Send(double_newline);
|
||||||
StatsSocket.flush();
|
StatsSocket.flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -140,7 +142,7 @@ namespace Buffer{
|
||||||
inBuffer.append(charBuffer, charCount);
|
inBuffer.append(charBuffer, charCount);
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
usleep(std::min(14999LL, lastTime - (now - timeDiff)) * 1000);
|
usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
buffer_running = false;
|
buffer_running = false;
|
||||||
|
|
|
@ -45,7 +45,8 @@ Buffer::Stream::~Stream(){
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calculate and return the current statistics in JSON format.
|
/// Calculate and return the current statistics in JSON format.
|
||||||
std::string Buffer::Stream::getStats(){
|
std::string & Buffer::Stream::getStats(){
|
||||||
|
static std::string ret;
|
||||||
unsigned int now = time(0);
|
unsigned int now = time(0);
|
||||||
unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
|
unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
|
||||||
stats_mutex.lock();
|
stats_mutex.lock();
|
||||||
|
@ -64,7 +65,7 @@ std::string Buffer::Stream::getStats(){
|
||||||
Storage["meta"] = Strm->metadata;
|
Storage["meta"] = Strm->metadata;
|
||||||
if (Storage["meta"].isMember("audio")){Storage["meta"]["audio"].removeMember("init");}
|
if (Storage["meta"].isMember("audio")){Storage["meta"]["audio"].removeMember("init");}
|
||||||
if (Storage["meta"].isMember("video")){Storage["meta"]["video"].removeMember("init");}
|
if (Storage["meta"].isMember("video")){Storage["meta"]["video"].removeMember("init");}
|
||||||
std::string ret = Storage.toString();
|
ret = Storage.toString();
|
||||||
Storage["log"].null();
|
Storage["log"].null();
|
||||||
stats_mutex.unlock();
|
stats_mutex.unlock();
|
||||||
return ret;
|
return ret;
|
||||||
|
|
|
@ -14,7 +14,7 @@ namespace Buffer{
|
||||||
/// Get a reference to this Stream object.
|
/// Get a reference to this Stream object.
|
||||||
static Stream * get();
|
static Stream * get();
|
||||||
/// Get the current statistics in JSON format.
|
/// Get the current statistics in JSON format.
|
||||||
std::string getStats();
|
std::string & getStats();
|
||||||
/// Get a new DTSC::Ring object for a user.
|
/// Get a new DTSC::Ring object for a user.
|
||||||
DTSC::Ring * getRing();
|
DTSC::Ring * getRing();
|
||||||
/// Drop a DTSC::Ring object.
|
/// Drop a DTSC::Ring object.
|
||||||
|
|
|
@ -103,7 +103,7 @@ namespace Connector_HTTP{
|
||||||
/// Handles internal requests.
|
/// Handles internal requests.
|
||||||
void Handle_Internal(HTTP::Parser & H, Socket::Connection * conn){
|
void Handle_Internal(HTTP::Parser & H, Socket::Connection * conn){
|
||||||
|
|
||||||
std::string url = H.url;
|
std::string url = H.getUrl();
|
||||||
|
|
||||||
if (url == "/crossdomain.xml"){
|
if (url == "/crossdomain.xml"){
|
||||||
H.Clean();
|
H.Clean();
|
||||||
|
@ -114,7 +114,7 @@ namespace Connector_HTTP{
|
||||||
return;
|
return;
|
||||||
}//crossdomain.xml
|
}//crossdomain.xml
|
||||||
|
|
||||||
if ((url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js") || (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(H.url.length() - 3, 3) == ".js")){
|
if ((url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js") || (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js")){
|
||||||
std::string streamname;
|
std::string streamname;
|
||||||
if (url.substr(0, 6) == "/info_"){
|
if (url.substr(0, 6) == "/info_"){
|
||||||
streamname = url.substr(6, url.length() - 9);
|
streamname = url.substr(6, url.length() - 9);
|
||||||
|
@ -293,24 +293,25 @@ namespace Connector_HTTP{
|
||||||
/// - dynamic (request fed from http_dynamic connector)
|
/// - dynamic (request fed from http_dynamic connector)
|
||||||
/// - progressive (request fed from http_progressive connector)
|
/// - progressive (request fed from http_progressive connector)
|
||||||
std::string getHTTPType(HTTP::Parser & H){
|
std::string getHTTPType(HTTP::Parser & H){
|
||||||
if ((H.url.find("f4m") != std::string::npos) || ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos))){
|
std::string url = H.getUrl();
|
||||||
std::string streamname = H.url.substr(1,H.url.find("/",1)-1);
|
if ((url.find("f4m") != std::string::npos) || ((url.find("Seg") != std::string::npos) && (url.find("Frag") != std::string::npos))){
|
||||||
|
std::string streamname = url.substr(1,url.find("/",1)-1);
|
||||||
Util::Stream::sanitizeName(streamname);
|
Util::Stream::sanitizeName(streamname);
|
||||||
H.SetVar("stream", streamname);
|
H.SetVar("stream", streamname);
|
||||||
return "dynamic";
|
return "dynamic";
|
||||||
}
|
}
|
||||||
if (H.url.length() > 4){
|
if (url.length() > 4){
|
||||||
std::string ext = H.url.substr(H.url.length() - 4, 4);
|
std::string ext = url.substr(url.length() - 4, 4);
|
||||||
if (ext == ".flv" || ext == ".mp3"){
|
if (ext == ".flv" || ext == ".mp3"){
|
||||||
std::string streamname = H.url.substr(1,H.url.length() - 5);
|
std::string streamname = url.substr(1,url.length() - 5);
|
||||||
Util::Stream::sanitizeName(streamname);
|
Util::Stream::sanitizeName(streamname);
|
||||||
H.SetVar("stream", streamname);
|
H.SetVar("stream", streamname);
|
||||||
return "progressive";
|
return "progressive";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (H.url == "/crossdomain.xml"){return "internal";}
|
if (url == "/crossdomain.xml"){return "internal";}
|
||||||
if (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";}
|
if (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js"){return "internal";}
|
||||||
if (H.url.length() > 9 && H.url.substr(0, 6) == "/info_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";}
|
if (url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js"){return "internal";}
|
||||||
return "none";
|
return "none";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,7 +325,7 @@ namespace Connector_HTTP{
|
||||||
if (Client.Read(conn->Received())){
|
if (Client.Read(conn->Received())){
|
||||||
std::string handler = getHTTPType(Client);
|
std::string handler = getHTTPType(Client);
|
||||||
#if DEBUG >= 4
|
#if DEBUG >= 4
|
||||||
std::cout << "Received request: " << Client.url << " => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl;
|
std::cout << "Received request: " << Client.getUrl() << " => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
if (handler == "none" || handler == "internal"){
|
if (handler == "none" || handler == "internal"){
|
||||||
if (handler == "internal"){
|
if (handler == "internal"){
|
||||||
|
|
|
@ -113,6 +113,7 @@ namespace Connector_HTTP{
|
||||||
/// Main function for Connector_HTTP_Dynamic
|
/// Main function for Connector_HTTP_Dynamic
|
||||||
int Connector_HTTP_Dynamic(Socket::Connection conn){
|
int Connector_HTTP_Dynamic(Socket::Connection conn){
|
||||||
std::string FlashBuf;
|
std::string FlashBuf;
|
||||||
|
int flashbuf_nonempty = 0;
|
||||||
FLV::Tag tmp;//temporary tag, for init data
|
FLV::Tag tmp;//temporary tag, for init data
|
||||||
|
|
||||||
std::queue<std::string> Flash_FragBuffer;//Fragment buffer
|
std::queue<std::string> Flash_FragBuffer;//Fragment buffer
|
||||||
|
@ -139,11 +140,27 @@ namespace Connector_HTTP{
|
||||||
if (conn.spool()){
|
if (conn.spool()){
|
||||||
if (HTTP_R.Read(conn.Received())){
|
if (HTTP_R.Read(conn.Received())){
|
||||||
#if DEBUG >= 4
|
#if DEBUG >= 4
|
||||||
std::cout << "Received request: " << HTTP_R.url << std::endl;
|
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
|
||||||
#endif
|
#endif
|
||||||
conn.setHost(HTTP_R.GetHeader("X-Origin"));
|
conn.setHost(HTTP_R.GetHeader("X-Origin"));
|
||||||
if (HTTP_R.url.find("f4m") == std::string::npos){
|
if (HTTP_R.url.find("f4m") == std::string::npos){
|
||||||
streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1);
|
streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1);
|
||||||
|
if (!ss){
|
||||||
|
ss = Util::Stream::getStream(streamname);
|
||||||
|
if (!ss.connected()){
|
||||||
|
#if DEBUG >= 1
|
||||||
|
fprintf(stderr, "Could not connect to server!\n");
|
||||||
|
#endif
|
||||||
|
ss.close();
|
||||||
|
HTTP_S.Clean();
|
||||||
|
HTTP_S.SetBody("No such stream is available on the system. Please try again.\n");
|
||||||
|
conn.Send(HTTP_S.BuildResponse("404", "Not found"));
|
||||||
|
ready4data = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ss.setBlocking(false);
|
||||||
|
inited = true;
|
||||||
|
}
|
||||||
Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 );
|
Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 );
|
||||||
Quality = Quality.substr(0, Quality.find("Seg"));
|
Quality = Quality.substr(0, Quality.find("Seg"));
|
||||||
temp = HTTP_R.url.find("Seg") + 3;
|
temp = HTTP_R.url.find("Seg") + 3;
|
||||||
|
@ -153,12 +170,27 @@ namespace Connector_HTTP{
|
||||||
#if DEBUG >= 4
|
#if DEBUG >= 4
|
||||||
printf( "Quality: %s, Seg %d Frag %d\n", Quality.c_str(), Segment, ReqFragment);
|
printf( "Quality: %s, Seg %d Frag %d\n", Quality.c_str(), Segment, ReqFragment);
|
||||||
#endif
|
#endif
|
||||||
ss.Send("f " + JSON::Value((long long int)ReqFragment) + "\no \n");
|
std::stringstream sstream;
|
||||||
|
sstream << "f " << ReqFragment << "\no \n";
|
||||||
|
ss.Send(sstream.str().c_str());
|
||||||
ss.flush();
|
ss.flush();
|
||||||
Flash_RequestPending++;
|
Flash_RequestPending++;
|
||||||
}else{
|
}else{
|
||||||
streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1);
|
streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1);
|
||||||
pending_manifest = true;
|
if (!Strm.metadata.isNull()){
|
||||||
|
HTTP_S.Clean();
|
||||||
|
HTTP_S.SetHeader("Content-Type","text/xml");
|
||||||
|
HTTP_S.SetHeader("Cache-Control","no-cache");
|
||||||
|
std::string manifest = BuildManifest(streamname, Strm.metadata);
|
||||||
|
HTTP_S.SetBody(manifest);
|
||||||
|
conn.Send(HTTP_S.BuildResponse("200", "OK"));
|
||||||
|
#if DEBUG >= 3
|
||||||
|
printf("Sent manifest\n");
|
||||||
|
#endif
|
||||||
|
pending_manifest = false;
|
||||||
|
}else{
|
||||||
|
pending_manifest = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ready4data = true;
|
ready4data = true;
|
||||||
HTTP_R.Clean(); //clean for any possible next requests
|
HTTP_R.Clean(); //clean for any possible next requests
|
||||||
|
@ -183,6 +215,7 @@ namespace Connector_HTTP{
|
||||||
ready4data = false;
|
ready4data = false;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
ss.setBlocking(false);
|
||||||
#if DEBUG >= 3
|
#if DEBUG >= 3
|
||||||
fprintf(stderr, "Everything connected, starting to send video data...\n");
|
fprintf(stderr, "Everything connected, starting to send video data...\n");
|
||||||
#endif
|
#endif
|
||||||
|
@ -202,7 +235,8 @@ namespace Connector_HTTP{
|
||||||
unsigned int now = time(0);
|
unsigned int now = time(0);
|
||||||
if (now != lastStats){
|
if (now != lastStats){
|
||||||
lastStats = now;
|
lastStats = now;
|
||||||
ss.Send("S "+conn.getStats("HTTP_Dynamic"));
|
ss.Send("S ");
|
||||||
|
ss.Send(conn.getStats("HTTP_Dynamic").c_str());
|
||||||
}
|
}
|
||||||
if (ss.spool() || ss.Received() != ""){
|
if (ss.spool() || ss.Received() != ""){
|
||||||
if (Strm.parsePacket(ss.Received())){
|
if (Strm.parsePacket(ss.Received())){
|
||||||
|
@ -216,7 +250,9 @@ namespace Connector_HTTP{
|
||||||
}
|
}
|
||||||
Strm.metadata["lasttime"] = Strm.getPacket(0)["time"];
|
Strm.metadata["lasttime"] = Strm.getPacket(0)["time"];
|
||||||
}
|
}
|
||||||
tag.DTSCLoader(Strm);
|
if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){
|
||||||
|
tag.DTSCLoader(Strm);
|
||||||
|
}
|
||||||
if (pending_manifest){
|
if (pending_manifest){
|
||||||
HTTP_S.Clean();
|
HTTP_S.Clean();
|
||||||
HTTP_S.SetHeader("Content-Type","text/xml");
|
HTTP_S.SetHeader("Content-Type","text/xml");
|
||||||
|
@ -229,17 +265,18 @@ namespace Connector_HTTP{
|
||||||
#endif
|
#endif
|
||||||
pending_manifest = false;
|
pending_manifest = false;
|
||||||
}
|
}
|
||||||
if (Strm.getPacket(0).isMember("keyframe")){
|
if (Strm.getPacket(0).isMember("keyframe") || Strm.getPacket(0)["datatype"].asString() == "pause_marker"){
|
||||||
if (FlashBuf != ""){
|
if (flashbuf_nonempty){
|
||||||
Flash_FragBuffer.push(FlashBuf);
|
Flash_FragBuffer.push(FlashBuf);
|
||||||
while (Flash_FragBuffer.size() > 2){
|
while (Flash_FragBuffer.size() > 2){
|
||||||
Flash_FragBuffer.pop();
|
Flash_FragBuffer.pop();
|
||||||
}
|
}
|
||||||
#if DEBUG >= 4
|
#if DEBUG >= 4
|
||||||
fprintf(stderr, "Received a fragment. Now %i in buffer.\n", (int)Flash_FragBuffer.size());
|
fprintf(stderr, "Received a %s fragment of %i packets. Now %i in buffer.\n", Strm.getPacket(0)["datatype"].asString().c_str(), flashbuf_nonempty, (int)Flash_FragBuffer.size());
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
FlashBuf.clear();
|
FlashBuf.clear();
|
||||||
|
flashbuf_nonempty = 0;
|
||||||
//fill buffer with init data, if needed.
|
//fill buffer with init data, if needed.
|
||||||
if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){
|
if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){
|
||||||
tmp.DTSCAudioInit(Strm);
|
tmp.DTSCAudioInit(Strm);
|
||||||
|
@ -250,14 +287,31 @@ namespace Connector_HTTP{
|
||||||
FlashBuf.append(tmp.data, tmp.len);
|
FlashBuf.append(tmp.data, tmp.len);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FlashBuf.append(tag.data, tag.len);
|
if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){
|
||||||
|
++flashbuf_nonempty;
|
||||||
|
FlashBuf.append(tag.data, tag.len);
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
if (pending_manifest && !Strm.metadata.isNull()){
|
||||||
|
HTTP_S.Clean();
|
||||||
|
HTTP_S.SetHeader("Content-Type","text/xml");
|
||||||
|
HTTP_S.SetHeader("Cache-Control","no-cache");
|
||||||
|
std::string manifest = BuildManifest(streamname, Strm.metadata);
|
||||||
|
HTTP_S.SetBody(manifest);
|
||||||
|
conn.Send(HTTP_S.BuildResponse("200", "OK"));
|
||||||
|
#if DEBUG >= 3
|
||||||
|
printf("Sent manifest\n");
|
||||||
|
#endif
|
||||||
|
pending_manifest = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!ss.connected()){break;}
|
if (!ss.connected()){break;}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.close();
|
conn.close();
|
||||||
ss.Send("S "+conn.getStats("HTTP_Dynamic"));
|
ss.Send("S ");
|
||||||
|
ss.Send(conn.getStats("HTTP_Dynamic").c_str());
|
||||||
ss.flush();
|
ss.flush();
|
||||||
ss.close();
|
ss.close();
|
||||||
#if DEBUG >= 1
|
#if DEBUG >= 1
|
||||||
|
|
|
@ -35,7 +35,7 @@ namespace Connector_HTTP{
|
||||||
FLV::Tag tag;///< Temporary tag buffer.
|
FLV::Tag tag;///< Temporary tag buffer.
|
||||||
|
|
||||||
unsigned int lastStats = 0;
|
unsigned int lastStats = 0;
|
||||||
unsigned int seek_pos = 0;//seek position in milliseconds
|
unsigned int seek_pos = 0;//seek position in ms
|
||||||
conn.setBlocking(false);//do not block on conn.spool() when no data is available
|
conn.setBlocking(false);//do not block on conn.spool() when no data is available
|
||||||
|
|
||||||
while (conn.connected()){
|
while (conn.connected()){
|
||||||
|
@ -43,14 +43,14 @@ namespace Connector_HTTP{
|
||||||
if (conn.spool()){
|
if (conn.spool()){
|
||||||
if (HTTP_R.Read(conn.Received())){
|
if (HTTP_R.Read(conn.Received())){
|
||||||
#if DEBUG >= 4
|
#if DEBUG >= 4
|
||||||
std::cout << "Received request: " << HTTP_R.url << std::endl;
|
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
|
||||||
#endif
|
#endif
|
||||||
conn.setHost(HTTP_R.GetHeader("X-Origin"));
|
conn.setHost(HTTP_R.GetHeader("X-Origin"));
|
||||||
//we assume the URL is the stream name with a 3 letter extension
|
//we assume the URL is the stream name with a 3 letter extension
|
||||||
streamname = HTTP_R.url;
|
streamname = HTTP_R.getUrl().substr(1);
|
||||||
size_t extDot = streamname.rfind('.');
|
size_t extDot = streamname.rfind('.');
|
||||||
if (extDot != std::string::npos){streamname.resize(extDot);};//strip the extension
|
if (extDot != std::string::npos){streamname.resize(extDot);};//strip the extension
|
||||||
seek_pos = 1000 * atof(HTTP_R.GetVar("start").c_str());//seconds to ms
|
seek_pos = atoi(HTTP_R.GetVar("start").c_str()) * 1000;//seconds to ms
|
||||||
ready4data = true;
|
ready4data = true;
|
||||||
HTTP_R.Clean(); //clean for any possible next requests
|
HTTP_R.Clean(); //clean for any possible next requests
|
||||||
}else{
|
}else{
|
||||||
|
@ -65,7 +65,7 @@ namespace Connector_HTTP{
|
||||||
ss = Util::Stream::getStream(streamname);
|
ss = Util::Stream::getStream(streamname);
|
||||||
if (!ss.connected()){
|
if (!ss.connected()){
|
||||||
#if DEBUG >= 1
|
#if DEBUG >= 1
|
||||||
fprintf(stderr, "Could not connect to server!\n");
|
fprintf(stderr, "Could not connect to server for %s!\n", streamname.c_str());
|
||||||
#endif
|
#endif
|
||||||
ss.close();
|
ss.close();
|
||||||
HTTP_S.Clean();
|
HTTP_S.Clean();
|
||||||
|
@ -77,7 +77,7 @@ namespace Connector_HTTP{
|
||||||
if (seek_pos){
|
if (seek_pos){
|
||||||
std::stringstream cmd;
|
std::stringstream cmd;
|
||||||
cmd << "s " << seek_pos << "\n";
|
cmd << "s " << seek_pos << "\n";
|
||||||
ss.Send(cmd.str());
|
ss.Send(cmd.str().c_str());
|
||||||
}
|
}
|
||||||
#if DEBUG >= 3
|
#if DEBUG >= 3
|
||||||
fprintf(stderr, "Everything connected, starting to send video data...\n");
|
fprintf(stderr, "Everything connected, starting to send video data...\n");
|
||||||
|
@ -89,7 +89,8 @@ namespace Connector_HTTP{
|
||||||
unsigned int now = time(0);
|
unsigned int now = time(0);
|
||||||
if (now != lastStats){
|
if (now != lastStats){
|
||||||
lastStats = now;
|
lastStats = now;
|
||||||
ss.Send("S "+conn.getStats("HTTP_Progressive"));
|
ss.Send("S ");
|
||||||
|
ss.Send(conn.getStats("HTTP_Progressive").c_str());
|
||||||
}
|
}
|
||||||
if (ss.spool() || ss.Received() != ""){
|
if (ss.spool() || ss.Received() != ""){
|
||||||
if (Strm.parsePacket(ss.Received())){
|
if (Strm.parsePacket(ss.Received())){
|
||||||
|
@ -100,34 +101,35 @@ namespace Connector_HTTP{
|
||||||
//HTTP_S.SetHeader("Transfer-Encoding", "chunked");
|
//HTTP_S.SetHeader("Transfer-Encoding", "chunked");
|
||||||
HTTP_S.protocol = "HTTP/1.0";
|
HTTP_S.protocol = "HTTP/1.0";
|
||||||
conn.Send(HTTP_S.BuildResponse("200", "OK"));//no SetBody = unknown length - this is intentional, we will stream the entire file
|
conn.Send(HTTP_S.BuildResponse("200", "OK"));//no SetBody = unknown length - this is intentional, we will stream the entire file
|
||||||
conn.Send(std::string(FLV::Header, 13));//write FLV header
|
conn.Send(FLV::Header, 13);//write FLV header
|
||||||
static FLV::Tag tmp;
|
static FLV::Tag tmp;
|
||||||
//write metadata
|
//write metadata
|
||||||
tmp.DTSCMetaInit(Strm);
|
tmp.DTSCMetaInit(Strm);
|
||||||
conn.Send(std::string(tmp.data, tmp.len));
|
conn.Send(tmp.data, tmp.len);
|
||||||
//write video init data, if needed
|
//write video init data, if needed
|
||||||
if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){
|
if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){
|
||||||
tmp.DTSCVideoInit(Strm);
|
tmp.DTSCVideoInit(Strm);
|
||||||
conn.Send(std::string(tmp.data, tmp.len));
|
conn.Send(tmp.data, tmp.len);
|
||||||
}
|
}
|
||||||
//write audio init data, if needed
|
//write audio init data, if needed
|
||||||
if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){
|
if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){
|
||||||
tmp.DTSCAudioInit(Strm);
|
tmp.DTSCAudioInit(Strm);
|
||||||
conn.Send(std::string(tmp.data, tmp.len));
|
conn.Send(tmp.data, tmp.len);
|
||||||
}
|
}
|
||||||
progressive_has_sent_header = true;
|
progressive_has_sent_header = true;
|
||||||
#if DEBUG >= 1
|
#if DEBUG >= 1
|
||||||
fprintf(stderr, "Sent progressive FLV header\n");
|
fprintf(stderr, "Sent progressive FLV header\n");
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
conn.Send(std::string(tag.data, tag.len));//write the tag contents
|
conn.Send(tag.data, tag.len);//write the tag contents
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!ss.connected()){break;}
|
if (!ss.connected()){break;}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.close();
|
conn.close();
|
||||||
ss.Send("S "+conn.getStats("HTTP_Dynamic"));
|
ss.Send("S ");
|
||||||
|
ss.Send(conn.getStats("HTTP_Dynamic").c_str());
|
||||||
ss.flush();
|
ss.flush();
|
||||||
ss.close();
|
ss.close();
|
||||||
#if DEBUG >= 1
|
#if DEBUG >= 1
|
||||||
|
|
|
@ -36,12 +36,14 @@ int main(int argc, char ** argv) {
|
||||||
lastStats = now;
|
lastStats = now;
|
||||||
std::stringstream st;
|
std::stringstream st;
|
||||||
st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
|
st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
|
||||||
S.Send(st.str());
|
std::string tmp = st.str();
|
||||||
|
S.Send(tmp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
std::stringstream st;
|
std::stringstream st;
|
||||||
st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
|
st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
|
||||||
S.Send(st.str());
|
std::string tmp = st.str();
|
||||||
|
S.Send(tmp);
|
||||||
S.flush();
|
S.flush();
|
||||||
S.close();
|
S.close();
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -32,6 +32,9 @@ namespace Connector_RTMP{
|
||||||
int play_streamid = -1;
|
int play_streamid = -1;
|
||||||
int play_msgtype = -1;
|
int play_msgtype = -1;
|
||||||
|
|
||||||
|
//generic state keeping
|
||||||
|
bool stream_inited = false;///true if init data for audio/video was sent
|
||||||
|
|
||||||
Socket::Connection Socket; ///< Socket connected to user
|
Socket::Connection Socket; ///< Socket connected to user
|
||||||
Socket::Connection SS; ///< Socket connected to server
|
Socket::Connection SS; ///< Socket connected to server
|
||||||
std::string streamname; ///< Stream that will be opened
|
std::string streamname; ///< Stream that will be opened
|
||||||
|
@ -48,7 +51,6 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
||||||
Socket.setBlocking(false);
|
Socket.setBlocking(false);
|
||||||
FLV::Tag tag, init_tag;
|
FLV::Tag tag, init_tag;
|
||||||
DTSC::Stream Strm;
|
DTSC::Stream Strm;
|
||||||
bool stream_inited = false;//true if init data for audio/video was sent
|
|
||||||
|
|
||||||
while (Socket.Received().size() < 1537 && Socket.connected()){Socket.spool(); usleep(5000);}
|
while (Socket.Received().size() < 1537 && Socket.connected()){Socket.spool(); usleep(5000);}
|
||||||
RTMPStream::handshake_in = Socket.Received().substr(0, 1537);
|
RTMPStream::handshake_in = Socket.Received().substr(0, 1537);
|
||||||
|
@ -101,7 +103,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
||||||
unsigned int now = time(0);
|
unsigned int now = time(0);
|
||||||
if (now != lastStats){
|
if (now != lastStats){
|
||||||
lastStats = now;
|
lastStats = now;
|
||||||
SS.Send("S "+Socket.getStats("RTMP"));
|
SS.Send("S ");
|
||||||
|
SS.Send(Socket.getStats("RTMP").c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (SS.spool()){
|
if (SS.spool()){
|
||||||
|
@ -167,7 +170,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Socket.close();
|
Socket.close();
|
||||||
SS.Send("S "+Socket.getStats("RTMP"));
|
SS.Send("S ");
|
||||||
|
SS.Send(Socket.getStats("RTMP").c_str());
|
||||||
SS.flush();
|
SS.flush();
|
||||||
SS.close();
|
SS.close();
|
||||||
#if DEBUG >= 1
|
#if DEBUG >= 1
|
||||||
|
@ -278,7 +282,7 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
|
||||||
if (counter > 8){
|
if (counter > 8){
|
||||||
sending = true;
|
sending = true;
|
||||||
SS.Send(meta_out.toNetPacked());
|
SS.Send(meta_out.toNetPacked());
|
||||||
SS.Send(prebuffer.str());//write buffer
|
SS.Send(prebuffer.str().c_str());//write buffer
|
||||||
prebuffer.str("");//clear buffer
|
prebuffer.str("");//clear buffer
|
||||||
SS.Send(pack_out.toNetPacked());
|
SS.Send(pack_out.toNetPacked());
|
||||||
}else{
|
}else{
|
||||||
|
@ -451,7 +455,9 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
|
||||||
Socket.close();//disconnect user
|
Socket.close();//disconnect user
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SS.Send("P "+Socket.getHost()+'\n');
|
SS.Send("P ");
|
||||||
|
SS.Send(Socket.getHost().c_str());
|
||||||
|
SS.Send("\n");
|
||||||
nostats = true;
|
nostats = true;
|
||||||
#if DEBUG >= 4
|
#if DEBUG >= 4
|
||||||
fprintf(stderr, "Connected to buffer, starting to send data...\n");
|
fprintf(stderr, "Connected to buffer, starting to send data...\n");
|
||||||
|
@ -503,7 +509,9 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
|
||||||
play_msgtype = messagetype;
|
play_msgtype = messagetype;
|
||||||
play_streamid = stream_id;
|
play_streamid = stream_id;
|
||||||
stream_inited = false;
|
stream_inited = false;
|
||||||
SS.Send("seek " + JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString() + "\n");
|
SS.Send("s ");
|
||||||
|
SS.Send(JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString().c_str());
|
||||||
|
SS.Send("\n");
|
||||||
return;
|
return;
|
||||||
}//seek
|
}//seek
|
||||||
|
|
||||||
|
|
143
src/player.cpp
143
src/player.cpp
|
@ -1,6 +1,10 @@
|
||||||
/// \file player.cpp
|
/// \file player.cpp
|
||||||
/// Holds all code for the MistPlayer application used for VoD streams.
|
/// Holds all code for the MistPlayer application used for VoD streams.
|
||||||
|
|
||||||
|
#if DEBUG >= 4
|
||||||
|
#include <iostream>//for std::cerr
|
||||||
|
#endif
|
||||||
|
|
||||||
#include <stdio.h> //for fileno
|
#include <stdio.h> //for fileno
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <mist/dtsc.h>
|
#include <mist/dtsc.h>
|
||||||
|
@ -23,14 +27,17 @@ int main(int argc, char** argv){
|
||||||
int playing = 0;
|
int playing = 0;
|
||||||
|
|
||||||
DTSC::File source = DTSC::File(conf.getString("filename"));
|
DTSC::File source = DTSC::File(conf.getString("filename"));
|
||||||
Socket::Connection in_out = Socket::Connection(fileno(stdin), fileno(stdout));
|
Socket::Connection in_out = Socket::Connection(fileno(stdout), fileno(stdin));
|
||||||
std::string meta_str = source.getHeader();
|
std::string meta_str = source.getHeader();
|
||||||
|
JSON::Value pausemark;
|
||||||
|
pausemark["datatype"] = "pause_marker";
|
||||||
|
pausemark["time"] = (long long int)0;
|
||||||
|
|
||||||
//send the header
|
//send the header
|
||||||
{
|
{
|
||||||
in_out.Send("DTSC");
|
in_out.Send("DTSC");
|
||||||
unsigned int size = htonl(meta_str.size());
|
unsigned int size = htonl(meta_str.size());
|
||||||
in_out.Send(std::string((char*)&size, (size_t)4));
|
in_out.Send((char*)&size, 4);
|
||||||
in_out.Send(meta_str);
|
in_out.Send(meta_str);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,60 +47,104 @@ int main(int argc, char** argv){
|
||||||
long long now, timeDiff = 0, lastTime = 0;
|
long long now, timeDiff = 0, lastTime = 0;
|
||||||
|
|
||||||
while (in_out.connected()){
|
while (in_out.connected()){
|
||||||
if (in_out.spool() && in_out.Received().find('\n') != std::string::npos){
|
if (in_out.spool()){
|
||||||
std::string cmd = in_out.Received().substr(0, in_out.Received().find('\n'));
|
while (in_out.Received().find('\n') != std::string::npos){
|
||||||
in_out.Received().erase(0, in_out.Received().find('\n')+1);
|
std::string cmd = in_out.Received().substr(0, in_out.Received().find('\n'));
|
||||||
if (cmd != ""){
|
in_out.Received().erase(0, in_out.Received().find('\n')+1);
|
||||||
switch (cmd[0]){
|
if (cmd != ""){
|
||||||
case 'P':{ //Push
|
switch (cmd[0]){
|
||||||
in_out.close();//pushing to VoD makes no sense
|
case 'P':{ //Push
|
||||||
} break;
|
#if DEBUG >= 4
|
||||||
case 'S':{ //Stats
|
std::cerr << "Received push - ignoring (" << cmd << ")" << std::endl;
|
||||||
/// \todo Parse stats command properly.
|
#endif
|
||||||
/* Stats(cmd.substr(2)); */
|
in_out.close();//pushing to VoD makes no sense
|
||||||
} break;
|
} break;
|
||||||
case 's':{ //second-seek
|
case 'S':{ //Stats
|
||||||
int second = JSON::Value(cmd.substr(2)).asInt();
|
#if DEBUG >= 4
|
||||||
double keyms = meta["video"]["keyms"].asInt();
|
//std::cerr << "Received stats - ignoring (" << cmd << ")" << std::endl;
|
||||||
if (keyms <= 0){keyms = 2000;}
|
#endif
|
||||||
source.seek_frame(second / (keyms / 1000.0));
|
/// \todo Parse stats command properly.
|
||||||
} break;
|
/* Stats(cmd.substr(2)); */
|
||||||
case 'f':{ //frame-seek
|
} break;
|
||||||
source.seek_frame(JSON::Value(cmd.substr(2)).asInt());
|
case 's':{ //second-seek
|
||||||
} break;
|
#if DEBUG >= 4
|
||||||
case 'p':{ //play
|
std::cerr << "Received ms-seek (" << cmd << ")" << std::endl;
|
||||||
playing = -1;
|
#endif
|
||||||
} break;
|
int ms = JSON::Value(cmd.substr(2)).asInt();
|
||||||
case 'o':{ //once-play
|
bool ret = source.seek_time(ms);
|
||||||
if (playing < 0){playing = 0;}
|
#if DEBUG >= 4
|
||||||
++playing;
|
std::cerr << "Second-seek completed (time " << ms << "ms) " << ret << std::endl;
|
||||||
} break;
|
#endif
|
||||||
case 'q':{ //quit-playing
|
} break;
|
||||||
playing = 0;
|
case 'f':{ //frame-seek
|
||||||
} break;
|
#if DEBUG >= 4
|
||||||
|
std::cerr << "Received frame-seek (" << cmd << ")" << std::endl;
|
||||||
|
#endif
|
||||||
|
bool ret = source.seek_frame(JSON::Value(cmd.substr(2)).asInt());
|
||||||
|
#if DEBUG >= 4
|
||||||
|
std::cerr << "Frame-seek completed " << ret << std::endl;
|
||||||
|
#endif
|
||||||
|
} break;
|
||||||
|
case 'p':{ //play
|
||||||
|
#if DEBUG >= 4
|
||||||
|
std::cerr << "Received play" << std::endl;
|
||||||
|
#endif
|
||||||
|
playing = -1;
|
||||||
|
in_out.setBlocking(false);
|
||||||
|
} break;
|
||||||
|
case 'o':{ //once-play
|
||||||
|
#if DEBUG >= 4
|
||||||
|
std::cerr << "Received once-play" << std::endl;
|
||||||
|
#endif
|
||||||
|
if (playing <= 0){playing = 1;}
|
||||||
|
++playing;
|
||||||
|
in_out.setBlocking(false);
|
||||||
|
} break;
|
||||||
|
case 'q':{ //quit-playing
|
||||||
|
#if DEBUG >= 4
|
||||||
|
std::cerr << "Received quit-playing" << std::endl;
|
||||||
|
#endif
|
||||||
|
playing = 0;
|
||||||
|
in_out.setBlocking(true);
|
||||||
|
} break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (playing != 0){
|
if (playing != 0){
|
||||||
now = getNowMS();
|
now = getNowMS();
|
||||||
if (now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) {
|
if (playing > 0 || now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) {
|
||||||
std::string packet = source.getPacket();
|
source.seekNext();
|
||||||
last_pack = JSON::fromDTMI(packet);
|
lastTime = source.getJSON()["time"].asInt();
|
||||||
lastTime = last_pack["time"].asInt();
|
if ((now - timeDiff - lastTime) > 5000 || (now - timeDiff - lastTime < -5000)){
|
||||||
if ((now - timeDiff - lastTime) > 15000 || (now - timeDiff - lastTime < -15000)){
|
|
||||||
timeDiff = now - lastTime;
|
timeDiff = now - lastTime;
|
||||||
}
|
}
|
||||||
//insert proper header for this type of data
|
if (source.getJSON().isMember("keyframe")){
|
||||||
in_out.Send("DTPD");
|
if (playing > 0){--playing;}
|
||||||
//insert the packet length
|
if (playing == 0){
|
||||||
unsigned int size = htonl(packet.size());
|
#if DEBUG >= 4
|
||||||
in_out.Send(std::string((char*)&size, (size_t)4));
|
std::cerr << "Sending pause_marker" << std::endl;
|
||||||
in_out.Send(packet);
|
#endif
|
||||||
|
pausemark["time"] = (long long int)now;
|
||||||
|
pausemark.toPacked();
|
||||||
|
in_out.Send(pausemark.toNetPacked());
|
||||||
|
in_out.flush();
|
||||||
|
in_out.setBlocking(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (playing != 0){
|
||||||
|
//insert proper header for this type of data
|
||||||
|
in_out.Send("DTPD");
|
||||||
|
//insert the packet length
|
||||||
|
unsigned int size = htonl(source.getPacket().size());
|
||||||
|
in_out.Send((char*)&size, 4);
|
||||||
|
in_out.Send(source.getPacket());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
usleep(std::min(14999LL, lastTime - (now - timeDiff)) * 1000);
|
usleep(std::min(10000LL, lastTime - (now - timeDiff)) * 1000);
|
||||||
}
|
}
|
||||||
if (playing > 0){--playing;}
|
|
||||||
}
|
}
|
||||||
|
usleep(10000);//sleep 10ms
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue