Splitted stuff
This commit is contained in:
parent
4020835ed5
commit
032f3e7377
4 changed files with 516 additions and 37 deletions
|
@ -11,7 +11,31 @@ RELEASE ?= "Generic_`getconf LONG_BIT`"
|
||||||
|
|
||||||
AM_CPPFLAGS = $(global_CFLAGS) $(MIST_CFLAGS) -DRELEASE=\"$(RELEASE)\"
|
AM_CPPFLAGS = $(global_CFLAGS) $(MIST_CFLAGS) -DRELEASE=\"$(RELEASE)\"
|
||||||
LDADD = $(MIST_LIBS)
|
LDADD = $(MIST_LIBS)
|
||||||
bin_PROGRAMS=MistBuffer MistController MistConnRAW MistConnRTMP MistConnHTTP MistConnHTTPProgressive MistConnHTTPDynamic MistConnHTTPSmooth MistConnHTTPLive MistConnTS MistPlayer MistDTSC2FLV MistFLV2DTSC MistDTSCFix MistDTSC2TS MistOGG2DTSC MistDTSC2OGG MistAnalyserRTMP MistAnalyserFLV MistAnalyserDTSC MistAnalyserAMF MistAnalyserMP4 MistAnalyserOGG
|
bin_PROGRAMS=MistBuffer
|
||||||
|
bin_PROGRAMS+=MistController
|
||||||
|
bin_PROGRAMS+=MistConnRAW
|
||||||
|
bin_PROGRAMS+=MistConnRTMP
|
||||||
|
bin_PROGRAMS+=MistConnHTTP
|
||||||
|
bin_PROGRAMS+=MistConnHTTPProgressiveFLV
|
||||||
|
bin_PROGRAMS+=MistConnHTTPProgressiveMP3
|
||||||
|
bin_PROGRAMS+=MistConnHTTPProgressiveOGG
|
||||||
|
bin_PROGRAMS+=MistConnHTTPDynamic
|
||||||
|
bin_PROGRAMS+=MistConnHTTPSmooth
|
||||||
|
bin_PROGRAMS+=MistConnHTTPLive
|
||||||
|
bin_PROGRAMS+=MistConnTS
|
||||||
|
bin_PROGRAMS+=MistPlayer
|
||||||
|
bin_PROGRAMS+=MistDTSC2FLV
|
||||||
|
bin_PROGRAMS+=MistFLV2DTSC
|
||||||
|
bin_PROGRAMS+=MistDTSCFix
|
||||||
|
bin_PROGRAMS+=MistDTSCMerge
|
||||||
|
bin_PROGRAMS+=MistDTSC2TS
|
||||||
|
bin_PROGRAMS+=MistSRT2DTSC
|
||||||
|
bin_PROGRAMS+=MistAnalyserRTMP
|
||||||
|
bin_PROGRAMS+=MistAnalyserFLV
|
||||||
|
bin_PROGRAMS+=MistAnalyserDTSC
|
||||||
|
bin_PROGRAMS+=MistAnalyserAMF
|
||||||
|
bin_PROGRAMS+=MistAnalyserMP4
|
||||||
|
bin_PROGRAMS+=MistInfo
|
||||||
|
|
||||||
#buffer folder (MistBuffer, MistPlayer)
|
#buffer folder (MistBuffer, MistPlayer)
|
||||||
MistBuffer_SOURCES=buffer/buffer.cpp buffer/buffer_user.h buffer/buffer_user.cpp buffer/buffer_stream.h buffer/buffer_stream.cpp tinythread.cpp tinythread.h ../VERSION
|
MistBuffer_SOURCES=buffer/buffer.cpp buffer/buffer_user.h buffer/buffer_user.cpp buffer/buffer_stream.h buffer/buffer_stream.cpp tinythread.cpp tinythread.h ../VERSION
|
||||||
|
@ -26,7 +50,9 @@ MistConnRAW_SOURCES=connectors/conn_raw.cpp ../VERSION
|
||||||
MistConnRTMP_SOURCES=connectors/conn_rtmp.cpp ../VERSION
|
MistConnRTMP_SOURCES=connectors/conn_rtmp.cpp ../VERSION
|
||||||
MistConnHTTP_SOURCES=connectors/conn_http.cpp tinythread.cpp tinythread.h ../VERSION connectors/embed.js.h connectors/icon.h
|
MistConnHTTP_SOURCES=connectors/conn_http.cpp tinythread.cpp tinythread.h ../VERSION connectors/embed.js.h connectors/icon.h
|
||||||
MistConnHTTP_LDADD=$(MIST_LIBS) -lpthread
|
MistConnHTTP_LDADD=$(MIST_LIBS) -lpthread
|
||||||
MistConnHTTPProgressive_SOURCES=connectors/conn_http_progressive.cpp ../VERSION
|
MistConnHTTPProgressiveFLV_SOURCES=connectors/conn_http_progressive_flv.cpp ../VERSION
|
||||||
|
MistConnHTTPProgressiveMP3_SOURCES=connectors/conn_http_progressive_mp3.cpp ../VERSION
|
||||||
|
MistConnHTTPProgressiveOGG_SOURCES=connectors/conn_http_progressive_ogg.cpp ../VERSION
|
||||||
MistConnHTTPDynamic_SOURCES=connectors/conn_http_dynamic.cpp ../VERSION
|
MistConnHTTPDynamic_SOURCES=connectors/conn_http_dynamic.cpp ../VERSION
|
||||||
MistConnHTTPSmooth_SOURCES=connectors/conn_http_smooth.cpp ../VERSION
|
MistConnHTTPSmooth_SOURCES=connectors/conn_http_smooth.cpp ../VERSION
|
||||||
MistConnHTTPLive_SOURCES=connectors/conn_http_live.cpp ../VERSION
|
MistConnHTTPLive_SOURCES=connectors/conn_http_live.cpp ../VERSION
|
||||||
|
@ -51,6 +77,7 @@ MistAnalyserMP4_SOURCES=analysers/mp4_analyser.cpp
|
||||||
MistAnalyserOGG_SOURCES=analysers/ogg_analyser.cpp
|
MistAnalyserOGG_SOURCES=analysers/ogg_analyser.cpp
|
||||||
MistInfo_SOURCES=info.cpp
|
MistInfo_SOURCES=info.cpp
|
||||||
|
|
||||||
|
|
||||||
connectors/embed.js.h: connectors/embed.js
|
connectors/embed.js.h: connectors/embed.js
|
||||||
$(CLOSURE) $(srcdir)/connectors/embed.js > embed.min.js
|
$(CLOSURE) $(srcdir)/connectors/embed.js > embed.min.js
|
||||||
xxd -i embed.min.js | sed s/_min_/_/g > $(srcdir)/connectors/embed.js.h
|
xxd -i embed.min.js | sed s/_min_/_/g > $(srcdir)/connectors/embed.js.h
|
||||||
|
|
|
@ -11,7 +11,6 @@
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/wait.h>
|
#include <sys/wait.h>
|
||||||
#include <getopt.h>
|
|
||||||
|
|
||||||
#include <mist/socket.h>
|
#include <mist/socket.h>
|
||||||
#include <mist/http_parser.h>
|
#include <mist/http_parser.h>
|
||||||
|
@ -69,9 +68,6 @@ namespace Connector_HTTP {
|
||||||
streamname = HTTP_R.getUrl().substr(1);
|
streamname = HTTP_R.getUrl().substr(1);
|
||||||
size_t extDot = streamname.rfind('.');
|
size_t extDot = streamname.rfind('.');
|
||||||
if (extDot != std::string::npos){
|
if (extDot != std::string::npos){
|
||||||
if (streamname.substr(extDot + 1) == "mp3"){
|
|
||||||
isMP3 = true;
|
|
||||||
}
|
|
||||||
streamname.resize(extDot);
|
streamname.resize(extDot);
|
||||||
}; //strip the extension
|
}; //strip the extension
|
||||||
int start = 0;
|
int start = 0;
|
||||||
|
@ -126,14 +122,14 @@ namespace Connector_HTTP {
|
||||||
}
|
}
|
||||||
int byterate = 0;
|
int byterate = 0;
|
||||||
for (JSON::ObjIter objIt = Strm.metadata["tracks"].ObjBegin(); objIt != Strm.metadata["tracks"].ObjEnd(); objIt++){
|
for (JSON::ObjIter objIt = Strm.metadata["tracks"].ObjBegin(); objIt != Strm.metadata["tracks"].ObjEnd(); objIt++){
|
||||||
if (videoID == -1 && objIt->second["type"].asStringRef() == "video"){
|
if (videoID == -1 && objIt->second["type"].asString() == "video"){
|
||||||
videoID = objIt->second["trackid"].asInt();
|
videoID = objIt->second["trackid"].asInt();
|
||||||
}
|
}
|
||||||
if (audioID == -1 && objIt->second["type"].asStringRef() == "audio"){
|
if (audioID == -1 && objIt->second["type"].asString() == "audio"){
|
||||||
audioID = objIt->second["trackid"].asInt();
|
audioID = objIt->second["trackid"].asInt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (videoID != -1 && !isMP3){
|
if (videoID != -1){
|
||||||
byterate += Strm.getTrackById(videoID)["bps"].asInt();
|
byterate += Strm.getTrackById(videoID)["bps"].asInt();
|
||||||
}
|
}
|
||||||
if (audioID != -1){
|
if (audioID != -1){
|
||||||
|
@ -162,40 +158,28 @@ namespace Connector_HTTP {
|
||||||
while (Strm.parsePacket(ss.Received())){
|
while (Strm.parsePacket(ss.Received())){
|
||||||
if ( !progressive_has_sent_header){
|
if ( !progressive_has_sent_header){
|
||||||
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
|
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
|
||||||
if (!isMP3){
|
HTTP_S.SetHeader("Content-Type", "video/x-flv"); //Send the correct content-type for FLV files
|
||||||
HTTP_S.SetHeader("Content-Type", "video/x-flv"); //Send the correct content-type for FLV files
|
|
||||||
}else{
|
|
||||||
HTTP_S.SetHeader("Content-Type", "audio/mpeg"); //Send the correct content-type for MP3 files
|
|
||||||
}
|
|
||||||
//HTTP_S.SetHeader("Transfer-Encoding", "chunked");
|
//HTTP_S.SetHeader("Transfer-Encoding", "chunked");
|
||||||
HTTP_S.protocol = "HTTP/1.0";
|
HTTP_S.protocol = "HTTP/1.0";
|
||||||
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); //no SetBody = unknown length - this is intentional, we will stream the entire file
|
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); //no SetBody = unknown length - this is intentional, we will stream the entire file
|
||||||
if ( !isMP3){
|
conn.SendNow(FLV::Header, 13); //write FLV header
|
||||||
conn.SendNow(FLV::Header, 13); //write FLV header
|
//write metadata
|
||||||
//write metadata
|
tag.DTSCMetaInit(Strm, Strm.getTrackById(videoID), Strm.getTrackById(audioID));
|
||||||
tag.DTSCMetaInit(Strm, Strm.getTrackById(videoID), Strm.getTrackById(audioID));
|
conn.SendNow(tag.data, tag.len);
|
||||||
|
//write video init data, if needed
|
||||||
|
if (videoID != -1 && Strm.getTrackById(videoID).isMember("init")){
|
||||||
|
tag.DTSCVideoInit(Strm.getTrackById(videoID));
|
||||||
|
conn.SendNow(tag.data, tag.len);
|
||||||
|
}
|
||||||
|
//write audio init data, if needed
|
||||||
|
if (audioID != -1 && Strm.getTrackById(audioID).isMember("init")){
|
||||||
|
tag.DTSCAudioInit(Strm.getTrackById(audioID));
|
||||||
conn.SendNow(tag.data, tag.len);
|
conn.SendNow(tag.data, tag.len);
|
||||||
//write video init data, if needed
|
|
||||||
if (videoID != -1 && Strm.getTrackById(videoID).isMember("init")){
|
|
||||||
tag.DTSCVideoInit(Strm.getTrackById(videoID));
|
|
||||||
conn.SendNow(tag.data, tag.len);
|
|
||||||
}
|
|
||||||
//write audio init data, if needed
|
|
||||||
if (audioID != -1 && Strm.getTrackById(audioID).isMember("init")){
|
|
||||||
tag.DTSCAudioInit(Strm.getTrackById(audioID));
|
|
||||||
conn.SendNow(tag.data, tag.len);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
progressive_has_sent_header = true;
|
progressive_has_sent_header = true;
|
||||||
}
|
}
|
||||||
if ( !isMP3){
|
tag.DTSCLoader(Strm);
|
||||||
tag.DTSCLoader(Strm);
|
conn.SendNow(tag.data, tag.len); //write the tag contents
|
||||||
conn.SendNow(tag.data, tag.len); //write the tag contents
|
|
||||||
}else{
|
|
||||||
if(Strm.lastType() == DTSC::AUDIO){
|
|
||||||
conn.SendNow(Strm.lastData()); //write the MP3 contents
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
Util::sleep(1);
|
Util::sleep(1);
|
||||||
|
@ -232,7 +216,7 @@ int main(int argc, char ** argv){
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
Socket::Server server_socket = Socket::Server("/tmp/mist/http_progressive");
|
Socket::Server server_socket = Socket::Server("/tmp/mist/http_progressive_flv");
|
||||||
if ( !server_socket.connected()){
|
if ( !server_socket.connected()){
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
222
src/connectors/conn_http_progressive_mp3.cpp
Normal file
222
src/connectors/conn_http_progressive_mp3.cpp
Normal file
|
@ -0,0 +1,222 @@
|
||||||
|
///\file conn_http_progressive.cpp
|
||||||
|
///\brief Contains the main code for the HTTP Progressive Connector
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <queue>
|
||||||
|
#include <sstream>
|
||||||
|
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cmath>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/wait.h>
|
||||||
|
|
||||||
|
#include <mist/socket.h>
|
||||||
|
#include <mist/http_parser.h>
|
||||||
|
#include <mist/dtsc.h>
|
||||||
|
#include <mist/flv_tag.h>
|
||||||
|
#include <mist/amf.h>
|
||||||
|
#include <mist/config.h>
|
||||||
|
#include <mist/stream.h>
|
||||||
|
#include <mist/timing.h>
|
||||||
|
|
||||||
|
///\brief Holds everything unique to HTTP Connectors.
|
||||||
|
namespace Connector_HTTP {
|
||||||
|
///\brief Main function for the HTTP Progressive Connector
|
||||||
|
///\param conn A socket describing the connection the client.
|
||||||
|
///\return The exit code of the connector.
|
||||||
|
int progressiveConnector(Socket::Connection conn){
|
||||||
|
bool progressive_has_sent_header = false;//Indicates whether we have sent a header.
|
||||||
|
bool ready4data = false; //Set to true when streaming is to begin.
|
||||||
|
DTSC::Stream Strm; //Incoming stream buffer.
|
||||||
|
HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender.
|
||||||
|
bool inited = false;//Whether the stream is initialized
|
||||||
|
Socket::Connection ss( -1);//The Stream Socket, used to connect to the desired stream.
|
||||||
|
std::string streamname;//Will contain the name of the stream.
|
||||||
|
FLV::Tag tag;//Temporary tag buffer.
|
||||||
|
|
||||||
|
unsigned int lastStats = 0;//Indicates the last time that we have sent stats to the server socket.
|
||||||
|
unsigned int seek_sec = 0;//Seek position in ms
|
||||||
|
unsigned int seek_byte = 0;//Seek position in bytes
|
||||||
|
|
||||||
|
int videoID = -1;
|
||||||
|
int audioID = -1;
|
||||||
|
|
||||||
|
while (conn.connected()){
|
||||||
|
//Only attempt to parse input when not yet init'ed.
|
||||||
|
if ( !inited){
|
||||||
|
if (conn.Received().size() || conn.spool()){
|
||||||
|
//make sure it ends in a \n
|
||||||
|
if ( *(conn.Received().get().rbegin()) != '\n'){
|
||||||
|
std::string tmp = conn.Received().get();
|
||||||
|
conn.Received().get().clear();
|
||||||
|
if (conn.Received().size()){
|
||||||
|
conn.Received().get().insert(0, tmp);
|
||||||
|
}else{
|
||||||
|
conn.Received().append(tmp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (HTTP_R.Read(conn.Received().get())){
|
||||||
|
#if DEBUG >= 5
|
||||||
|
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
|
||||||
|
#endif
|
||||||
|
conn.setHost(HTTP_R.GetHeader("X-Origin"));
|
||||||
|
//we assume the URL is the stream name with a 3 letter extension
|
||||||
|
streamname = HTTP_R.getUrl().substr(1);
|
||||||
|
size_t extDot = streamname.rfind('.');
|
||||||
|
if (extDot != std::string::npos){
|
||||||
|
streamname.resize(extDot);
|
||||||
|
}; //strip the extension
|
||||||
|
int start = 0;
|
||||||
|
if ( !HTTP_R.GetVar("start").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("start").c_str());
|
||||||
|
}
|
||||||
|
if ( !HTTP_R.GetVar("starttime").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("starttime").c_str());
|
||||||
|
}
|
||||||
|
if ( !HTTP_R.GetVar("apstart").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("apstart").c_str());
|
||||||
|
}
|
||||||
|
if ( !HTTP_R.GetVar("ec_seek").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("ec_seek").c_str());
|
||||||
|
}
|
||||||
|
if ( !HTTP_R.GetVar("fs").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("fs").c_str());
|
||||||
|
}
|
||||||
|
//under 3 hours we assume seconds, otherwise byte position
|
||||||
|
if (start < 10800){
|
||||||
|
seek_sec = start * 1000; //ms, not s
|
||||||
|
}else{
|
||||||
|
seek_byte = start; //divide by 1mbit, then *1000 for ms.
|
||||||
|
}
|
||||||
|
ready4data = true;
|
||||||
|
HTTP_R.Clean(); //clean for any possible next requests
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ready4data){
|
||||||
|
if ( !inited){
|
||||||
|
//we are ready, connect the socket!
|
||||||
|
ss = Util::Stream::getStream(streamname);
|
||||||
|
if ( !ss.connected()){
|
||||||
|
#if DEBUG >= 1
|
||||||
|
fprintf(stderr, "Could not connect to server for %s!\n", streamname.c_str());
|
||||||
|
#endif
|
||||||
|
ss.close();
|
||||||
|
HTTP_S.Clean();
|
||||||
|
HTTP_S.SetBody("No such stream is available on the system. Please try again.\n");
|
||||||
|
conn.SendNow(HTTP_S.BuildResponse("404", "Not found"));
|
||||||
|
ready4data = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
//wait until we have a header
|
||||||
|
while ( !Strm.metadata && ss.connected()){
|
||||||
|
if (ss.spool()){
|
||||||
|
Strm.parsePacket(ss.Received()); //read the metadata
|
||||||
|
}else{
|
||||||
|
Util::sleep(5);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int byterate = 0;
|
||||||
|
for (JSON::ObjIter objIt = Strm.metadata["tracks"].ObjBegin(); objIt != Strm.metadata["tracks"].ObjEnd(); objIt++){
|
||||||
|
if (videoID == -1 && objIt->second["type"].asString() == "video"){
|
||||||
|
videoID = objIt->second["trackid"].asInt();
|
||||||
|
}
|
||||||
|
if (audioID == -1 && objIt->second["type"].asString() == "audio"){
|
||||||
|
audioID = objIt->second["trackid"].asInt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (audioID != -1){
|
||||||
|
byterate += Strm.getTrackById(audioID)["bps"].asInt();
|
||||||
|
}
|
||||||
|
if ( !byterate){byterate = 1;}
|
||||||
|
seek_sec = (seek_byte / byterate) * 1000;
|
||||||
|
std::stringstream cmd;
|
||||||
|
cmd << "t";
|
||||||
|
if (videoID != -1){
|
||||||
|
cmd << " " << videoID;
|
||||||
|
}
|
||||||
|
if (audioID != -1){
|
||||||
|
cmd << " " << audioID;
|
||||||
|
}
|
||||||
|
cmd << "\ns " << seek_sec << "\np\n";
|
||||||
|
ss.SendNow(cmd.str().c_str(), cmd.str().size());
|
||||||
|
inited = true;
|
||||||
|
}
|
||||||
|
unsigned int now = Util::epoch();
|
||||||
|
if (now != lastStats){
|
||||||
|
lastStats = now;
|
||||||
|
ss.SendNow(conn.getStats("HTTP_Progressive").c_str());
|
||||||
|
}
|
||||||
|
if (ss.spool()){
|
||||||
|
while (Strm.parsePacket(ss.Received())){
|
||||||
|
if ( !progressive_has_sent_header){
|
||||||
|
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
|
||||||
|
HTTP_S.SetHeader("Content-Type", "audio/mpeg"); //Send the correct content-type for MP3 files
|
||||||
|
//HTTP_S.SetHeader("Transfer-Encoding", "chunked");
|
||||||
|
HTTP_S.protocol = "HTTP/1.0";
|
||||||
|
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); //no SetBody = unknown length - this is intentional, we will stream the entire file
|
||||||
|
progressive_has_sent_header = true;
|
||||||
|
}
|
||||||
|
if(Strm.lastType() == DTSC::AUDIO){
|
||||||
|
conn.SendNow(Strm.lastData()); //write the MP3 contents
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
Util::sleep(1);
|
||||||
|
}
|
||||||
|
if ( !ss.connected()){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conn.close();
|
||||||
|
ss.SendNow(conn.getStats("HTTP_Dynamic").c_str());
|
||||||
|
ss.close();
|
||||||
|
return 0;
|
||||||
|
} //Progressive_Connector main function
|
||||||
|
|
||||||
|
} //Connector_HTTP namespace
|
||||||
|
|
||||||
|
///\brief The standard process-spawning main function.
|
||||||
|
int main(int argc, char ** argv){
|
||||||
|
Util::Config conf(argv[0], PACKAGE_VERSION);
|
||||||
|
JSON::Value capa;
|
||||||
|
capa["desc"] = "Enables HTTP protocol progressive streaming.";
|
||||||
|
capa["deps"] = "HTTP";
|
||||||
|
capa["url_rel"] = "/$.mp3";
|
||||||
|
capa["url_match"] = "/$.mp3";
|
||||||
|
capa["url_handler"] = "http";
|
||||||
|
capa["url_type"] = "mp3";
|
||||||
|
capa["socket"] = "http_progressive_mp3";
|
||||||
|
conf.addBasicConnectorOptions(capa);
|
||||||
|
conf.parseArgs(argc, argv);
|
||||||
|
|
||||||
|
if (conf.getBool("json")){
|
||||||
|
std::cout << capa.toString() << std::endl;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Socket::Server server_socket = Socket::Server("/tmp/mist/http_progressive_mp3");
|
||||||
|
if ( !server_socket.connected()){
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
conf.activate();
|
||||||
|
|
||||||
|
while (server_socket.connected() && conf.is_active){
|
||||||
|
Socket::Connection S = server_socket.accept();
|
||||||
|
if (S.connected()){ //check if the new connection is valid
|
||||||
|
pid_t myid = fork();
|
||||||
|
if (myid == 0){ //if new child, start MAINHANDLER
|
||||||
|
return Connector_HTTP::progressiveConnector(S);
|
||||||
|
}else{ //otherwise, do nothing or output debugging text
|
||||||
|
#if DEBUG >= 5
|
||||||
|
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} //while connected
|
||||||
|
server_socket.close();
|
||||||
|
return 0;
|
||||||
|
} //main
|
246
src/connectors/conn_http_progressive_ogg.cpp
Normal file
246
src/connectors/conn_http_progressive_ogg.cpp
Normal file
|
@ -0,0 +1,246 @@
|
||||||
|
///\file conn_http_progressive.cpp
|
||||||
|
///\brief Contains the main code for the HTTP Progressive Connector
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <queue>
|
||||||
|
#include <sstream>
|
||||||
|
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cmath>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/wait.h>
|
||||||
|
|
||||||
|
#include <mist/socket.h>
|
||||||
|
#include <mist/http_parser.h>
|
||||||
|
#include <mist/dtsc.h>
|
||||||
|
#include <mist/ogg.h>
|
||||||
|
#include <mist/amf.h>
|
||||||
|
#include <mist/config.h>
|
||||||
|
#include <mist/stream.h>
|
||||||
|
#include <mist/timing.h>
|
||||||
|
|
||||||
|
///\brief Holds everything unique to HTTP Connectors.
|
||||||
|
namespace Connector_HTTP {
|
||||||
|
///\brief Main function for the HTTP Progressive Connector
|
||||||
|
///\param conn A socket describing the connection the client.
|
||||||
|
///\return The exit code of the connector.
|
||||||
|
int progressiveConnector(Socket::Connection conn){
|
||||||
|
bool progressive_has_sent_header = false;//Indicates whether we have sent a header.
|
||||||
|
bool ready4data = false; //Set to true when streaming is to begin.
|
||||||
|
DTSC::Stream Strm; //Incoming stream buffer.
|
||||||
|
HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender.
|
||||||
|
bool inited = false;//Whether the stream is initialized
|
||||||
|
Socket::Connection ss( -1);//The Stream Socket, used to connect to the desired stream.
|
||||||
|
std::string streamname;//Will contain the name of the stream.
|
||||||
|
|
||||||
|
//OGG specific variables
|
||||||
|
OGG::headerPages oggMeta;
|
||||||
|
OGG::Page curOggPage;
|
||||||
|
std::map <long long unsigned int, std::vector<JSON::Value> > DTSCBuffer;
|
||||||
|
std::map <long long unsigned int, long long unsigned int> prevGran;
|
||||||
|
|
||||||
|
|
||||||
|
unsigned int lastStats = 0;//Indicates the last time that we have sent stats to the server socket.
|
||||||
|
unsigned int seek_sec = 0;//Seek position in ms
|
||||||
|
unsigned int seek_byte = 0;//Seek position in bytes
|
||||||
|
|
||||||
|
bool isMP3 = false;//Indicates whether the request is audio-only mp3.
|
||||||
|
|
||||||
|
int videoID = -1;
|
||||||
|
int audioID = -1;
|
||||||
|
|
||||||
|
while (conn.connected()){
|
||||||
|
//Only attempt to parse input when not yet init'ed.
|
||||||
|
if ( !inited){
|
||||||
|
if (conn.Received().size() || conn.spool()){
|
||||||
|
//make sure it ends in a \n
|
||||||
|
if ( *(conn.Received().get().rbegin()) != '\n'){
|
||||||
|
std::string tmp = conn.Received().get();
|
||||||
|
conn.Received().get().clear();
|
||||||
|
if (conn.Received().size()){
|
||||||
|
conn.Received().get().insert(0, tmp);
|
||||||
|
}else{
|
||||||
|
conn.Received().append(tmp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (HTTP_R.Read(conn.Received().get())){
|
||||||
|
#if DEBUG >= 5
|
||||||
|
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
|
||||||
|
#endif
|
||||||
|
conn.setHost(HTTP_R.GetHeader("X-Origin"));
|
||||||
|
//we assume the URL is the stream name with a 3 letter extension
|
||||||
|
streamname = HTTP_R.getUrl().substr(1);
|
||||||
|
size_t extDot = streamname.rfind('.');
|
||||||
|
if (extDot != std::string::npos){
|
||||||
|
streamname.resize(extDot);
|
||||||
|
}; //strip the extension
|
||||||
|
int start = 0;
|
||||||
|
if ( !HTTP_R.GetVar("start").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("start").c_str());
|
||||||
|
}
|
||||||
|
if ( !HTTP_R.GetVar("starttime").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("starttime").c_str());
|
||||||
|
}
|
||||||
|
if ( !HTTP_R.GetVar("apstart").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("apstart").c_str());
|
||||||
|
}
|
||||||
|
if ( !HTTP_R.GetVar("ec_seek").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("ec_seek").c_str());
|
||||||
|
}
|
||||||
|
if ( !HTTP_R.GetVar("fs").empty()){
|
||||||
|
start = atoi(HTTP_R.GetVar("fs").c_str());
|
||||||
|
}
|
||||||
|
//under 3 hours we assume seconds, otherwise byte position
|
||||||
|
if (start < 10800){
|
||||||
|
seek_sec = start * 1000; //ms, not s
|
||||||
|
}else{
|
||||||
|
seek_byte = start; //divide by 1mbit, then *1000 for ms.
|
||||||
|
}
|
||||||
|
ready4data = true;
|
||||||
|
HTTP_R.Clean(); //clean for any possible next requests
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ready4data){
|
||||||
|
if ( !inited){
|
||||||
|
//we are ready, connect the socket!
|
||||||
|
ss = Util::Stream::getStream(streamname);
|
||||||
|
if ( !ss.connected()){
|
||||||
|
#if DEBUG >= 1
|
||||||
|
fprintf(stderr, "Could not connect to server for %s!\n", streamname.c_str());
|
||||||
|
#endif
|
||||||
|
ss.close();
|
||||||
|
HTTP_S.Clean();
|
||||||
|
HTTP_S.SetBody("No such stream is available on the system. Please try again.\n");
|
||||||
|
conn.SendNow(HTTP_S.BuildResponse("404", "Not found"));
|
||||||
|
ready4data = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
//wait until we have a header
|
||||||
|
while ( !Strm.metadata && ss.connected()){
|
||||||
|
if (ss.spool()){
|
||||||
|
Strm.parsePacket(ss.Received()); //read the metadata
|
||||||
|
}else{
|
||||||
|
Util::sleep(5);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int byterate = 0;
|
||||||
|
for (JSON::ObjIter objIt = Strm.metadata["tracks"].ObjBegin(); objIt != Strm.metadata["tracks"].ObjEnd(); objIt++){
|
||||||
|
if (videoID == -1 && objIt->second["type"].asString() == "video"){
|
||||||
|
videoID = objIt->second["trackid"].asInt();
|
||||||
|
}
|
||||||
|
if (audioID == -1 && objIt->second["type"].asString() == "audio"){
|
||||||
|
audioID = objIt->second["trackid"].asInt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (videoID != -1){
|
||||||
|
byterate += Strm.getTrackById(videoID)["bps"].asInt();
|
||||||
|
}
|
||||||
|
if (audioID != -1){
|
||||||
|
byterate += Strm.getTrackById(audioID)["bps"].asInt();
|
||||||
|
}
|
||||||
|
if ( !byterate){byterate = 1;}
|
||||||
|
seek_sec = (seek_byte / byterate) * 1000;
|
||||||
|
std::stringstream cmd;
|
||||||
|
cmd << "t";
|
||||||
|
if (videoID != -1){
|
||||||
|
cmd << " " << videoID;
|
||||||
|
}
|
||||||
|
if (audioID != -1){
|
||||||
|
cmd << " " << audioID;
|
||||||
|
}
|
||||||
|
cmd << "\ns " << seek_sec << "\np\n";
|
||||||
|
ss.SendNow(cmd.str().c_str(), cmd.str().size());
|
||||||
|
inited = true;
|
||||||
|
}
|
||||||
|
unsigned int now = Util::epoch();
|
||||||
|
if (now != lastStats){
|
||||||
|
lastStats = now;
|
||||||
|
ss.SendNow(conn.getStats("HTTP_Progressive_Ogg").c_str());
|
||||||
|
}
|
||||||
|
if (ss.spool()){
|
||||||
|
while (Strm.parsePacket(ss.Received())){
|
||||||
|
|
||||||
|
if ( !progressive_has_sent_header){
|
||||||
|
HTTP_S.Clean(); //make sure no parts of old requests are left in any buffers
|
||||||
|
HTTP_S.SetHeader("Content-Type", "video/ogg"); //Send the correct content-type for FLV files
|
||||||
|
HTTP_S.protocol = "HTTP/1.0";
|
||||||
|
conn.SendNow(HTTP_S.BuildResponse("200", "OK")); //no SetBody = unknown length - this is intentional, we will stream the entire file
|
||||||
|
//Fill in ogg header here
|
||||||
|
oggMeta.readDTSCHeader(Strm.metadata);
|
||||||
|
conn.SendNow((char*)oggMeta.parsedPages.c_str(), oggMeta.parsedPages.size());
|
||||||
|
progressive_has_sent_header = true;
|
||||||
|
}
|
||||||
|
std::cerr << "Parsing DTSC to Ogg" << std::endl;
|
||||||
|
//parse DTSC to Ogg here
|
||||||
|
long long unsigned int temp = Strm.getPacket()["trackid"].asInt();
|
||||||
|
if(prevGran[temp] != Strm.getPacket()["granule"].asInt()){
|
||||||
|
std::cerr << "Sending Ogg over connection" << std::endl;
|
||||||
|
curOggPage.clear();
|
||||||
|
curOggPage.readDTSCVector(DTSCBuffer[temp], oggMeta.DTSCID2OGGSerial[temp], oggMeta.DTSCID2seqNum[temp]);
|
||||||
|
conn.SendNow((char*)curOggPage.getPage(), curOggPage.getPageSize());
|
||||||
|
DTSCBuffer[temp].clear();
|
||||||
|
}
|
||||||
|
//long long unsigned int prevID = Strm.getPacket()["trackid"].asInt();
|
||||||
|
DTSCBuffer[temp].push_back(Strm.getPacket());
|
||||||
|
prevGran[temp] = Strm.getPacket()["granule"].asInt();
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
Util::sleep(1);
|
||||||
|
}
|
||||||
|
if ( !ss.connected()){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conn.close();
|
||||||
|
ss.SendNow(conn.getStats("HTTP_Dynamic").c_str());
|
||||||
|
ss.close();
|
||||||
|
return 0;
|
||||||
|
} //Progressive_Connector main function
|
||||||
|
|
||||||
|
} //Connector_HTTP namespace
|
||||||
|
|
||||||
|
///\brief The standard process-spawning main function.
|
||||||
|
int main(int argc, char ** argv){
|
||||||
|
Util::Config conf(argv[0], PACKAGE_VERSION);
|
||||||
|
JSON::Value capa;
|
||||||
|
capa["desc"] = "Enables HTTP protocol progressive streaming.";
|
||||||
|
capa["deps"] = "HTTP";
|
||||||
|
capa["url_rel"] = "/$.ogg";
|
||||||
|
capa["url_match"] = "/$.ogg";
|
||||||
|
capa["url_handler"] = "http";
|
||||||
|
capa["url_type"] = "ogg";
|
||||||
|
capa["socket"] = "http_progressive_ogg";
|
||||||
|
conf.addBasicConnectorOptions(capa);
|
||||||
|
conf.parseArgs(argc, argv);
|
||||||
|
|
||||||
|
if (conf.getBool("json")){
|
||||||
|
std::cout << capa.toString() << std::endl;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Socket::Server server_socket = Socket::Server("/tmp/mist/http_progressive_ogg");
|
||||||
|
if ( !server_socket.connected()){
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
conf.activate();
|
||||||
|
|
||||||
|
while (server_socket.connected() && conf.is_active){
|
||||||
|
Socket::Connection S = server_socket.accept();
|
||||||
|
if (S.connected()){ //check if the new connection is valid
|
||||||
|
pid_t myid = fork();
|
||||||
|
if (myid == 0){ //if new child, start MAINHANDLER
|
||||||
|
return Connector_HTTP::progressiveConnector(S);
|
||||||
|
}else{ //otherwise, do nothing or output debugging text
|
||||||
|
#if DEBUG >= 5
|
||||||
|
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} //while connected
|
||||||
|
server_socket.close();
|
||||||
|
return 0;
|
||||||
|
} //main
|
Loading…
Add table
Reference in a new issue