Added HTTP Live Connector, for HLS. Streams bipbop video up till approximately 40 seconds properly, needs to be optimized further for other videos. Audio is buggy still.

This commit is contained in:
Erik Zandvliet 2013-01-03 20:09:41 +01:00
parent acc1c1acdb
commit 0bb5a342e1

509
src/conn_http_live.cpp Normal file
View file

@ -0,0 +1,509 @@
#define DEBUG 5
/// \file conn_http_dynamic.cpp
/// Contains the main code for the HTTP Dynamic Connector
#include <iostream>
#include <iomanip>
#include <sstream>
#include <queue>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <getopt.h>
#include <mist/socket.h>
#include <mist/http_parser.h>
#include <mist/json.h>
#include <mist/dtsc.h>
#include <mist/mp4.h>
#include <mist/config.h>
#include <sstream>
#include <mist/stream.h>
#include <mist/timing.h>
#include <mist/ts_packet.h>
/// Holds everything unique to HTTP Connectors.
namespace Connector_HTTP {
/// Parses the list of keyframes into 10 second fragments
std::vector<int> keyframesToFragments( JSON::Value & metadata ) {
std::vector<int> result;
if( metadata.isNull() ) {
return result;
}
result.push_back(0);
int currentBase = 0;
for (int i = 0; i < metadata["keytime"].size(); i++){
if ((metadata["keytime"][i].asInt() - currentBase) > 10000){
currentBase = metadata["keytime"][i].asInt();
result.push_back(i);
}
}
return result;
}
/// Returns a m3u or m3u8 index file
std::string BuildIndex(std::string & MovieId, JSON::Value & metadata){
std::stringstream Result;
std::vector<int> fragIndices = keyframesToFragments( metadata );
int longestFragment = 0;
for (int i = 1; i < fragIndices.size(); i++){
int fragDuration = metadata["keytime"][fragIndices[i]].asInt() - metadata["keytime"][fragIndices[i-1]].asInt();
if (fragDuration > longestFragment){
longestFragment = fragDuration;
}
}
if (metadata.isMember("length") && metadata["length"].asInt() > 0){
Result <<
"#EXTM3U\r\n"
"#EXT-X-VERSION:3\r\n"
"#EXT-X-MEDIA-SEQUENCE:1\r\n"
//"#EXT-X-ALLOW-CACHE:YES\r\n"
"#EXT-X-TARGETDURATION:" << (longestFragment / 1000) + 1 << "\r\n"
"#EXT-X-PLAYLIST-TYPE:VOD\r\n";
int lastDuration = 0;
int lastBytePos = 0;
bool writeOffset = true;
int j = 0;
for (int i = 0; i < fragIndices.size(); i++) {
if (lastDuration != 0){
Result << "#EXTINF:" << (metadata["keytime"][fragIndices[i]].asInt() - lastDuration) / 1000 << "." << std::setw(3) << std::setfill('0') << ((metadata["keytime"][fragIndices[i]].asInt() - lastDuration) % 1000) << ",\r\n"
<< ++j << ".ts\r\n";
}
lastDuration = metadata["keytime"][fragIndices[i]].asInt();
lastBytePos = metadata["keybpos"][fragIndices[i]].asInt();
}
Result << "#EXT-X-ENDLIST";
}else{
Result <<
"#EXTM3U\r\n"
"#EXT-X-VERSION:4\r\n"
"#EXT-X-MEDIA-SEQUENCE:1\r\n"
"#EXT-X-TARGETDURATION:" << ((metadata["video"]["keyms"].asInt() + metadata["video"]["keyvar"].asInt()) / 1000) + 1 << "\r\n";
}
#if DEBUG >= 8
std::cerr << "Sending this index:" << std::endl << Result.str() << std::endl;
#endif
return Result.str();
} //BuildIndex
/// Main function for Connector_HTTP_Live
int Connector_HTTP_Live(Socket::Connection conn){
std::stringstream TSBuf;
long long int TSBufTime = 0;
DTSC::Stream Strm; //Incoming stream buffer.
HTTP::Parser HTTP_R, HTTP_S; //HTTP Receiver en HTTP Sender.
bool ready4data = false; //Set to true when streaming is to begin.
bool pending_manifest = false;
bool receive_marks = false; //when set to true, this stream will ignore keyframes and instead use pause marks
bool inited = false;
Socket::Connection ss( -1);
std::string streamname;
std::string recBuffer = "";
std::string ToPack;
TS::Packet PackData;
std::string DTMIData;
int PacketNumber = 0;
long long unsigned int TimeStamp = 0;
int ThisNaluSize;
char VideoCounter = 0;
char AudioCounter = 0;
bool WritePesHeader;
bool IsKeyFrame;
bool FirstKeyFrame = true;
bool FirstIDRInKeyFrame;
MP4::AVCC avccbox;
bool haveAvcc = false;
std::vector<int> fragIndices;
int Segment = -1;
int temp;
int Flash_RequestPending = 0;
unsigned int lastStats = 0;
conn.setBlocking(false); //do not block on conn.spool() when no data is available
while (conn.connected()){
if (conn.spool() || conn.Received().size()){
//make sure it ends in a \n
if ( *(conn.Received().get().rbegin()) != '\n'){
std::string tmp = conn.Received().get();
conn.Received().get().clear();
if (conn.Received().size()){
conn.Received().get().insert(0, tmp);
}else{
conn.Received().append(tmp);
}
}
if (HTTP_R.Read(conn.Received().get())){
#if DEBUG >= 4
std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
#endif
conn.setHost(HTTP_R.GetHeader("X-Origin"));
if (HTTP_R.url.find(".m3u") == std::string::npos){
streamname = HTTP_R.url.substr(5, HTTP_R.url.find("/", 5) - 5);
if ( !ss){
ss = Util::Stream::getStream(streamname);
if ( !ss.connected()){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
ss.close();
HTTP_S.Clean();
HTTP_S.SetBody("No such stream is available on the system. Please try again.\n");
conn.SendNow(HTTP_S.BuildResponse("404", "Not found"));
ready4data = false;
continue;
}
ss.setBlocking(false);
inited = true;
}
temp = HTTP_R.url.find("/", 5) + 1;
Segment = atoi(HTTP_R.url.substr(temp, HTTP_R.url.find(".ts", temp) - temp).c_str());
if( !fragIndices.size() ) {
fragIndices = keyframesToFragments( Strm.metadata );
}
#if DEBUG >= 4
fprintf( stderr, "Fragment number %d\n", Segment);
fprintf( stderr, "Fragment indices %d\n", fragIndices.size() );
#endif
std::stringstream sstream;
sstream << "f " << Segment + 1 << "\n";
int frameCount;
if (Segment == fragIndices.size() - 1){
frameCount = Strm.metadata["keytime"].size() - fragIndices[Segment];
}else{
frameCount = Strm.metadata["keytime"][fragIndices[Segment+1]].asInt() - Strm.metadata["keytime"][fragIndices[Segment]].asInt();
}
#if DEBUG >= 4
fprintf( stderr, "Frame count %d\n", frameCount);
#endif
for (int i = 0; i < frameCount; i++){
sstream << "o \n";
}
ss.SendNow(sstream.str().c_str());
Flash_RequestPending++;
}else{
streamname = HTTP_R.url.substr(5, HTTP_R.url.find("/", 5) - 5);
if ( !Strm.metadata.isNull()){
HTTP_S.Clean();
if (HTTP_R.url.find(".m3u8") != std::string::npos) {
HTTP_S.SetHeader("Content-Type", "application/vnd.apple.mpegurl");//m3u8
}else{
HTTP_S.SetHeader("Content-Type", "audio/mpegurl");//m3u
}
HTTP_S.SetHeader("Cache-Control", "no-cache");
if (Strm.metadata.isMember("length")){
receive_marks = true;
}
std::string manifest = BuildIndex(streamname, Strm.metadata);
HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
#if DEBUG >= 3
printf("Sent manifest\n");
#endif
pending_manifest = false;
}else{
pending_manifest = true;
}
}
ready4data = true;
HTTP_R.Clean(); //clean for any possible next requests
}
}else{
if (Flash_RequestPending){
usleep(1000); //sleep 1ms
}else{
usleep(10000); //sleep 10ms
}
}
if (ready4data){
if ( !inited){
//we are ready, connect the socket!
ss = Util::Stream::getStream(streamname);
if ( !ss.connected()){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
ss.close();
HTTP_S.Clean();
HTTP_S.SetBody("No such stream is available on the system. Please try again.\n");
conn.SendNow(HTTP_S.BuildResponse("404", "Not found"));
ready4data = false;
continue;
}
ss.setBlocking(false);
#if DEBUG >= 3
fprintf(stderr, "Everything connected, starting to send video data...\n");
#endif
inited = true;
}
unsigned int now = Util::epoch();
if (now != lastStats){
lastStats = now;
ss.SendNow(conn.getStats("HTTP_Live").c_str());
}
if (ss.spool()){
while (Strm.parsePacket(ss.Received())){
if (Strm.getPacket(0).isMember("time")){
if ( !Strm.metadata.isMember("firsttime")){
Strm.metadata["firsttime"] = Strm.getPacket(0)["time"];
}else{
if ( !Strm.metadata.isMember("length") || Strm.metadata["length"].asInt() == 0){
Strm.getPacket(0)["time"] = Strm.getPacket(0)["time"].asInt() - Strm.metadata["firsttime"].asInt();
}
}
Strm.metadata["lasttime"] = Strm.getPacket(0)["time"];
}
if (pending_manifest){
HTTP_S.Clean();
HTTP_S.SetHeader("Cache-Control", "no-cache");
if (Strm.metadata.isMember("length")){
receive_marks = true;
}
std::string manifest = BuildIndex(streamname, Strm.metadata);
if (HTTP_R.url.find(".m3u8") != std::string::npos) {
HTTP_S.SetHeader("Content-Type", "application/vnd.apple.mpegurl");//m3u8
}else{
HTTP_S.SetHeader("Content-Type", "audio/mpegurl");//m3u
}
HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
#if DEBUG >= 3
printf("Sent manifest\n");
#endif
pending_manifest = false;
}
if ( !receive_marks && Strm.metadata.isMember("length")){
receive_marks = true;
}
if ((Strm.getPacket(0).isMember("keyframe") && !receive_marks) || Strm.lastType() == DTSC::PAUSEMARK){
#if DEBUG >= 4
fprintf(stderr, "Received a %s fragment of %i bytes.\n", Strm.getPacket(0)["datatype"].asString().c_str(), TSBuf.rdbuf()->in_avail());
#endif
if (Flash_RequestPending > 0 && TSBuf.rdbuf()->in_avail()){
#if DEBUG >= 3
fprintf(stderr, "Sending a fragment...");
#endif
HTTP_S.Clean();
HTTP_S.SetHeader("Content-Type", "video/MP2T");
HTTP_S.SetBody("");
HTTP_S.SetHeader("Content-Length", TSBuf.rdbuf()->in_avail());
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
conn.SendNow(TSBuf.str().c_str(),TSBuf.rdbuf()->in_avail());
TSBuf.clear();
Flash_RequestPending--;
PacketNumber = 0;
#if DEBUG >= 3
fprintf(stderr, "Done\n");
#endif
}
TSBuf.clear();
}
if( !haveAvcc ) {
avccbox.setPayload( Strm.metadata["video"]["init"].asString() );
haveAvcc = true;
}
if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){
if( Strm.lastType() == DTSC::VIDEO ) {
DTMIData = Strm.lastData();
if( Strm.getPacket(0).isMember("keyframe") ) {
IsKeyFrame = true;
FirstIDRInKeyFrame = true;
} else {
IsKeyFrame = false;
FirstKeyFrame = false;
}
if( IsKeyFrame ) {
TimeStamp = ( Strm.getPacket(0)["time"].asInt() * 27000 );
}
int TSType;
bool FirstPic = true;
bool haveInit = false;
std::string videoBuffer;
while( DTMIData.size() ) {
ThisNaluSize = (DTMIData[0] << 24) + (DTMIData[1] << 16) +
(DTMIData[2] << 8) + DTMIData[3];
DTMIData.erase(0,4);//Erase the first four characters;
TSType = (int)DTMIData[0] & 0x1F;
if( TSType == 0x09 ) {
DTMIData.erase(0,ThisNaluSize);
continue;
}
if( TSType == 0x07 || TSType == 0x08 ) {
haveInit = true;
}
if( TSType == 0x05 ) {
if( FirstPic ) {
if( !haveInit ) {
ToPack += avccbox.asAnnexB( );
}
FirstPic = false;
}
if( IsKeyFrame ) {
if( !FirstKeyFrame && FirstIDRInKeyFrame ) {
ToPack += (char)0x00;
FirstIDRInKeyFrame = false;
}
ToPack.append(TS::ShortNalHeader,3);
}
} else if ( TSType == 0x01 ) {
if( FirstPic ) {
ToPack += (char)0x00;
FirstPic = false;
}
ToPack.append(TS::ShortNalHeader,3);
} else {
ToPack.append(TS::NalHeader,4);
}
ToPack.append(DTMIData,0,ThisNaluSize);
DTMIData.erase(0,ThisNaluSize);
}
WritePesHeader = true;
while( ToPack.size() ) {
if (PacketNumber == 0) {
PackData.DefaultPAT();
TSBuf << std::string(PackData.ToString(), 188);
PackData.DefaultPMT();
TSBuf << std::string(PackData.ToString(), 188);
PacketNumber += 2;
}
PackData.Clear();
PackData.PID( 0x100 );
PackData.ContinuityCounter( VideoCounter );
VideoCounter ++;
if( WritePesHeader ) {
PackData.UnitStart( 1 );
if( IsKeyFrame ) {
PackData.RandomAccess( 1 );
PackData.PCR( TimeStamp );
} else {
PackData.AdaptationField( 1 );
}
PackData.AddStuffing( 184 - (20+ToPack.size()) );
PackData.PESVideoLeadIn( ToPack.size(), Strm.getPacket(0)["time"].asInt() * 90 );
WritePesHeader = false;
} else {
PackData.AdaptationField( 1 );
PackData.AddStuffing( 184 - (ToPack.size()) );
}
PackData.FillFree( ToPack );
TSBuf << std::string(PackData.ToString(), 188);
PacketNumber ++;
}
} else if( Strm.lastType() == DTSC::AUDIO ) {
WritePesHeader = true;
std::string audioBuffer = TS::GetAudioHeader( DTMIData.size(), Strm.metadata["audio"]["init"].asString() );
int fullSize = Strm.lastData().size() + audioBuffer.size();
int currentOffset = 0;
while( fullSize ) {
if ( PacketNumber == 0 ) {
PackData.DefaultPAT();
TSBuf << std::string(PackData.ToString(), 188);
PackData.DefaultPMT();
TSBuf << std::string(PackData.ToString(), 188);
PacketNumber += 2;
}
PackData.Clear();
PackData.PID( 0x101 );
PackData.ContinuityCounter( AudioCounter );
AudioCounter ++;
if( WritePesHeader ) {
PackData.UnitStart( 1 );
PackData.RandomAccess( 0 );
PackData.AddStuffing( 184 - (14 + fullSize) );
PackData.PESAudioLeadIn( fullSize, Strm.getPacket(0)["time"].asInt() * 81000 );
WritePesHeader = false;
} else {
PackData.AdaptationField( 1 );
PackData.AddStuffing( 184 - fullSize );
}
PackData.FillFree( audioBuffer );
if( PackData.BytesFree( ) ) {
currentOffset += PackData.FillFree( Strm.lastData().c_str() + currentOffset, Strm.lastData().size() - currentOffset );
}
TSBuf << std::string(PackData.ToString(), 188);
PacketNumber ++;
fullSize = audioBuffer.size() + (Strm.lastData().size() - currentOffset);
}
}
}
}
if (pending_manifest && !Strm.metadata.isNull()){
HTTP_S.Clean();
HTTP_S.SetHeader("Cache-Control", "no-cache");
if (Strm.metadata.isMember("length")){
receive_marks = true;
}
if (HTTP_R.url.find(".m3u8") != std::string::npos) {
HTTP_S.SetHeader("Content-Type", "application/vnd.apple.mpegurl");//m3u8
}else{
HTTP_S.SetHeader("Content-Type", "audio/mpegurl");//m3u
}
std::string manifest = BuildIndex(streamname, Strm.metadata);
HTTP_S.SetBody(manifest);
conn.SendNow(HTTP_S.BuildResponse("200", "OK"));
#if DEBUG >= 3
printf("Sent manifest\n");
#endif
pending_manifest = false;
}
}
if ( !ss.connected()){
break;
}
}
}
conn.close();
ss.SendNow(conn.getStats("HTTP_Live").c_str());
ss.close();
#if DEBUG >= 1
fprintf(stderr, "User %i disconnected.\n", conn.getSocket());
if (inited){
fprintf(stderr, "Status was: inited\n");
}else{
if (ready4data){
fprintf(stderr, "Status was: ready4data\n");
}else{
fprintf(stderr, "Status was: connected\n");
}
}
#endif
return 0;
} //Connector_HTTP_Dynamic main function
} //Connector_HTTP_Dynamic namespace
int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addConnectorOptions(1935);
conf.parseArgs(argc, argv);
Socket::Server server_socket = Socket::Server("/tmp/mist/http_live");
if ( !server_socket.connected()){
return 1;
}
conf.activate();
while (server_socket.connected() && conf.is_active){
Socket::Connection S = server_socket.accept();
if (S.connected()){ //check if the new connection is valid
pid_t myid = fork();
if (myid == 0){ //if new child, start MAINHANDLER
return Connector_HTTP::Connector_HTTP_Live(S);
}else{ //otherwise, do nothing or output debugging text
#if DEBUG >= 3
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());
#endif
}
}
} //while connected
server_socket.close();
return 0;
} //main