Fix IP addresses for HTTP connectors, update all code to only use non-deprecated libmist calls.

This commit is contained in:
Thulinma 2012-08-26 19:58:31 +02:00
parent b141b4058e
commit afefe24578
7 changed files with 84 additions and 59 deletions

View file

@ -39,7 +39,8 @@ namespace Buffer{
StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
}
if (StatsSocket.connected()){
StatsSocket.write(Stream::get()->getStats()+"\n\n");
StatsSocket.Send(Stream::get()->getStats()+"\n\n");
StatsSocket.flush();
}
}
StatsSocket.close();
@ -50,23 +51,18 @@ namespace Buffer{
std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl;
usr->myRing = thisStream->getRing();
if (!usr->S.write(thisStream->getHeader())){
usr->Disconnect("failed to receive the header!");
return;
}
usr->S.Send(thisStream->getHeader());
usr->S.flush();
while (usr->S.connected()){
usleep(5000); //sleep 5ms
if (usr->S.canRead()){
usr->inbuffer.clear();
char charbuf;
while ((usr->S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){
usr->inbuffer += charbuf;
}
if (usr->inbuffer != ""){
if (usr->inbuffer[0] == 'P'){
std::cout << "Push attempt from IP " << usr->inbuffer.substr(2) << std::endl;
if (thisStream->checkWaitingIP(usr->inbuffer.substr(2))){
if (usr->S.spool() && usr->S.Received().find('\n') != std::string::npos){
std::string cmd = usr->S.Received().substr(0, usr->S.Received().find('\n'));
usr->S.Received().erase(0, usr->S.Received().find('\n')+1);
if (cmd != ""){
if (cmd[0] == 'P'){
std::cout << "Push attempt from IP " << cmd.substr(2) << std::endl;
if (thisStream->checkWaitingIP(cmd.substr(2))){
if (thisStream->setInput(usr->S)){
std::cout << "Push accepted!" << std::endl;
usr->S = Socket::Connection(-1);
@ -78,8 +74,8 @@ namespace Buffer{
usr->Disconnect("Push denied - invalid IP address!");
}
}
if (usr->inbuffer[0] == 'S'){
usr->tmpStats = Stats(usr->inbuffer.substr(2));
if (cmd[0] == 'S'){
usr->tmpStats = Stats(cmd.substr(2));
unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime;
if (secs < 1){secs = 1;}
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs;
@ -140,21 +136,18 @@ namespace Buffer{
/// No changes to the speed are made.
void handlePushin(void * empty){
if (empty != 0){return;}
std::string inBuffer;
while (buffer_running){
if (thisStream->getIPInput().connected()){
if (inBuffer.size() > 0){
if (thisStream->getIPInput().spool()){
thisStream->getWriteLock();
if (thisStream->getStream()->parsePacket(inBuffer)){
if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){
thisStream->getStream()->outPacket(0);
thisStream->dropWriteLock(true);
}else{
thisStream->dropWriteLock(false);
thisStream->getIPInput().iread(inBuffer);
usleep(1000);//1ms wait
}
}else{
thisStream->getIPInput().iread(inBuffer);
usleep(1000);//1ms wait
}
}else{
@ -199,7 +192,7 @@ namespace Buffer{
while (buffer_running && SS.connected() && conf.is_active){
//check for new connections, accept them if there are any
//starts a thread for every accepted connection
incoming = SS.accept(false);
incoming = SS.accept(true);
if (incoming.connected()){
user * usr_ptr = new user(incoming);
thisStream->addUser(usr_ptr);

View file

@ -32,12 +32,14 @@ Buffer::user::~user(){
/// Disconnects the current user. Doesn't do anything if already disconnected.
/// Prints "Disconnected user" to stdout if disconnect took place.
void Buffer::user::Disconnect(std::string reason) {
Stream::get()->clearStats(MyStr, lastStats, reason);
if (S.connected()){S.close();}
if (Thread != 0){
if (Thread->joinable()){Thread->join();}
if (Thread->joinable()){
Thread->join();
}
Thread = 0;
}
Stream::get()->clearStats(MyStr, lastStats, reason);
}//Disconnect
/// Tries to send the current buffer, returns true if success, false otherwise.

View file

@ -155,6 +155,7 @@ namespace Connector_HTTP{
//create a unique ID based on a hash of the user agent and host, followed by the stream name and connector
std::string uid = md5(H.GetHeader("User-Agent")+conn->getHost())+"_"+H.GetVar("stream")+"_"+connector;
H.SetHeader("X-UID", uid);//add the UID to the headers before copying
H.SetHeader("X-Origin", conn->getHost());//add the UID to the headers before copying
std::string request = H.BuildRequest();//copy the request for later forwarding to the connector
H.Clean();

View file

@ -136,6 +136,7 @@ namespace Connector_HTTP{
#if DEBUG >= 4
std::cout << "Received request: " << HTTP_R.url << std::endl;
#endif
conn.setHost(HTTP_R.GetHeader("X-Origin"));
if (HTTP_R.url.find("f4m") == std::string::npos){
streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1);
Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 );
@ -249,6 +250,8 @@ namespace Connector_HTTP{
}
}
conn.close();
ss.Send("S "+conn.getStats("HTTP_Dynamic"));
ss.flush();
ss.close();
#if DEBUG >= 1
if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());}

View file

@ -42,6 +42,7 @@ namespace Connector_HTTP{
#if DEBUG >= 4
std::cout << "Received request: " << HTTP_R.url << std::endl;
#endif
conn.setHost(HTTP_R.GetHeader("X-Origin"));
//we assume the URL is the stream name with a 3 letter extension
std::string extension = HTTP_R.url.substr(HTTP_R.url.size()-4);
streamname = HTTP_R.url.substr(0, HTTP_R.url.size()-4);//strip the extension
@ -115,6 +116,8 @@ namespace Connector_HTTP{
}
}
conn.close();
ss.Send("S "+conn.getStats("HTTP_Dynamic"));
ss.flush();
ss.close();
#if DEBUG >= 1
if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());}

View file

@ -2,6 +2,7 @@
/// Contains the main code for the RAW connector.
#include <iostream>
#include <sstream>
#include <mist/config.h>
#include <mist/socket.h>
@ -13,17 +14,34 @@ int main(int argc, char ** argv) {
conf.addOption("stream_name", JSON::fromString("{\"arg_num\":1, \"help\":\"Name of the stream to write to stdout.\"}"));
conf.parseArgs(argc, argv);
std::string input = "/tmp/shared_socket_" + conf.getString("stream_name");
//connect to the proper stream
Socket::Connection S(input);
Socket::Connection S = Socket::getStream(conf.getString("stream_name"));
S.setBlocking(false);
if (!S.connected()){
std::cout << "Could not open stream " << conf.getString("stream_name") << std::endl;
return 1;
}
//transport ~50kb at a time
//this is a nice tradeoff between CPU usage and speed
const char buffer[50000] = {0};
while(std::cout.good() && S.read(buffer,50000)){std::cout.write(buffer,50000);}
unsigned int lastStats = 0;
unsigned int started = time(0);
while(std::cout.good()){
if (S.spool()){
std::cout.write(S.Received().c_str(),S.Received().size());
S.Received().clear();
}else{
usleep(10000);//sleep 10ms if no data
}
unsigned int now = time(0);
if (now != lastStats){
lastStats = now;
std::stringstream st;
st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
S.Send(st.str());
}
}
std::stringstream st;
st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
S.Send(st.str());
S.flush();
S.close();
return 0;
}

View file

@ -39,17 +39,20 @@ namespace Connector_RTMP{
/// Main Connector_RTMP function
int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
Socket = conn;
Socket.setBlocking(false);
FLV::Tag tag, init_tag;
DTSC::Stream Strm;
bool stream_inited = false;//true if init data for audio/video was sent
RTMPStream::handshake_in.reserve(1537);
Socket.read((char*)RTMPStream::handshake_in.c_str(), 1537);
while (Socket.Received().size() < 1537 && Socket.connected()){Socket.spool(); usleep(5000);}
RTMPStream::handshake_in = Socket.Received().substr(0, 1537);
Socket.Received().erase(0, 1537);
RTMPStream::rec_cnt += 1537;
if (RTMPStream::doHandshake()){
Socket.write(RTMPStream::handshake_out);
Socket.read((char*)RTMPStream::handshake_in.c_str(), 1536);
Socket.Send(RTMPStream::handshake_out);
while (Socket.Received().size() < 1536 && Socket.connected()){Socket.spool(); usleep(5000);}
Socket.Received().erase(0, 1536);
RTMPStream::rec_cnt += 1536;
#if DEBUG >= 4
fprintf(stderr, "Handshake succcess!\n");
@ -62,11 +65,12 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
}
unsigned int lastStats = 0;
conn.setBlocking(false);
bool firstrun = true;
while (Socket.connected()){
usleep(10000);//sleep 10ms to prevent high CPU usage
if (Socket.spool()){
if (Socket.spool() || firstrun){
firstrun = false;
parseChunk(Socket.Received());
}
if (ready4data){
@ -80,6 +84,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
Socket.close();//disconnect user
break;
}
SS.setBlocking(false);
#if DEBUG >= 3
fprintf(stderr, "Everything connected, starting to send video data...\n");
#endif
@ -89,8 +94,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
unsigned int now = time(0);
if (now != lastStats){
lastStats = now;
std::string stat = "S "+Socket.getStats("RTMP");
SS.write(stat);
SS.Send("S "+Socket.getStats("RTMP"));
}
}
if (SS.spool()){
@ -119,8 +123,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
}
}
}
SS.close();
Socket.close();
SS.Send("S "+Socket.getStats("RTMP"));
SS.flush();
SS.close();
#if DEBUG >= 1
if (FLV::Parse_Error){fprintf(stderr, "FLV Parse Error: %s\n", FLV::Error_Str.c_str());}
fprintf(stderr, "User %i disconnected.\n", conn.getSocket());
@ -157,7 +163,7 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
//send ACK if we received a whole window
if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)){
RTMPStream::rec_window_at = RTMPStream::rec_cnt;
Socket.write(RTMPStream::SendCTL(3, RTMPStream::rec_cnt));//send ack (msg 3)
Socket.Send(RTMPStream::SendCTL(3, RTMPStream::rec_cnt));//send ack (msg 3)
}
switch (next.msg_type_id){
@ -207,7 +213,7 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
#endif
RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str());
RTMPStream::rec_window_at = RTMPStream::rec_cnt;
Socket.write(RTMPStream::SendCTL(3, RTMPStream::rec_cnt));//send ack (msg 3)
Socket.Send(RTMPStream::SendCTL(3, RTMPStream::rec_cnt));//send ack (msg 3)
break;
case 6:
#if DEBUG >= 4
@ -215,7 +221,7 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
#endif
//4 bytes window size, 1 byte limit type (ignored)
RTMPStream::snd_window_size = ntohl(*(int*)next.data.c_str());
Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5)
Socket.Send(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5)
break;
case 8://audio data
case 9://video data
@ -228,15 +234,15 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
counter++;
if (counter > 8){
sending = true;
SS.write(meta_out.toNetPacked());
SS.write(prebuffer.str());//write buffer
SS.Send(meta_out.toNetPacked());
SS.Send(prebuffer.str());//write buffer
prebuffer.str("");//clear buffer
SS.write(pack_out.toNetPacked());
SS.Send(pack_out.toNetPacked());
}else{
prebuffer << pack_out.toNetPacked();
}
}else{
SS.write(pack_out.toNetPacked());
SS.Send(pack_out.toNetPacked());
}
}
}else{
@ -301,9 +307,9 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int stream_id){
if (messagetype == 17){
Socket.write(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack()));
Socket.Send(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack()));
}else{
Socket.write(RTMPStream::SendChunk(3, messagetype, stream_id, amfreply.Pack()));
Socket.Send(RTMPStream::SendChunk(3, messagetype, stream_id, amfreply.Pack()));
}
}//sendCommand
@ -319,7 +325,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
if (amfdata.getContentP(2)->getContentP("objectEncoding")){
objencoding = amfdata.getContentP(2)->getContentP("objectEncoding")->NumValue();
}
fprintf(stderr, "Object encoding set to %e\n", objencoding);
#if DEBUG >= 4
int tmpint;
if (amfdata.getContentP(2)->getContentP("videoCodecs")){
@ -334,10 +339,10 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
}
#endif
RTMPStream::chunk_snd_max = 4096;
Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5)
Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send rec window acknowledgement size (msg 6)
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
Socket.Send(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5)
Socket.Send(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send rec window acknowledgement size (msg 6)
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
@ -377,7 +382,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
return;
}//createStream
if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){
@ -408,7 +413,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
Socket.close();//disconnect user
return;
}
SS.write("P "+Socket.getHost()+'\n');
SS.Send("P "+Socket.getHost()+'\n');
nostats = true;
#if DEBUG >= 4
fprintf(stderr, "Connected to buffer, starting to send data...\n");
@ -424,7 +429,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
@ -457,7 +462,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){
//send streambegin
streamname = amfdata.getContentP(3)->StrValue();
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
@ -488,7 +493,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
#endif
sendCommand(amfreply, messagetype, stream_id);
RTMPStream::chunk_snd_max = 102400;//100KiB
Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
Connector_RTMP::ready4data = true;//start sending video data!
return;
}//createStream