From a09ef74ca2bd43daa33c5a439d4c22b899a221ef Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 20 Jun 2011 19:27:59 +0200 Subject: [PATCH 1/5] Fix missing socket.h/cpp --- util/socket.cpp | 444 ++++++++++++++++++++++++++++++++++++++++++++++++ util/socket.h | 65 +++++++ 2 files changed, 509 insertions(+) create mode 100644 util/socket.cpp create mode 100644 util/socket.h diff --git a/util/socket.cpp b/util/socket.cpp new file mode 100644 index 00000000..60c3c69e --- /dev/null +++ b/util/socket.cpp @@ -0,0 +1,444 @@ +/// \file socket.cpp +/// A handy Socket wrapper library. +/// Written by Jaron Vietor in 2010 for DDVTech + +#include "socket.h" + +/// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection. +/// \param sockNo Integer representing the socket to convert. +Socket::Connection::Connection(int sockNo){ + sock = sockNo; + Error = false; + Blocking = false; +}//Socket::Connection basic constructor + +/// Create a new disconnected base socket. This is a basic constructor for placeholder purposes. +/// A socket created like this is always disconnected and should/could be overwritten at some point. +Socket::Connection::Connection(){ + sock = -1; + Error = false; + Blocking = false; +}//Socket::Connection basic constructor + +/// Close connection. The internal socket is closed and then set to -1. +void Socket::Connection::close(){ + #if DEBUG >= 4 + fprintf(stderr, "Socket closed.\n"); + #endif + shutdown(sock, SHUT_RDWR); + ::close(sock); + sock = -1; +}//Socket::Connection::close + +/// Returns internal socket number. +int Socket::Connection::getSocket(){return sock;} + +/// Create a new Unix Socket. This socket will (try to) connect to the given address right away. +/// \param address String containing the location of the Unix socket to connect to. +/// \param nonblock Whether the socket should be nonblocking. False by default. +Socket::Connection::Connection(std::string address, bool nonblock){ + sock = socket(PF_UNIX, SOCK_STREAM, 0); + if (sock < 0){ + #if DEBUG >= 1 + fprintf(stderr, "Could not create socket! Error: %s\n", strerror(errno)); + #endif + return; + } + Error = false; + Blocking = false; + sockaddr_un addr; + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, address.c_str(), address.size()+1); + int r = connect(sock, (sockaddr*)&addr, sizeof(addr)); + if (r == 0){ + if (nonblock){ + int flags = fcntl(sock, F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(sock, F_SETFL, flags); + } + }else{ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to %s! Error: %s\n", address.c_str(), strerror(errno)); + #endif + close(); + } +}//Socket::Connection Unix Contructor + +/// Returns the ready-state for this socket. +/// \returns 1 if data is waiting to be read, -1 if not connected, 0 otherwise. +signed int Socket::Connection::ready(){ + if (sock < 0) return -1; + char tmp; + int preflags = fcntl(sock, F_GETFL, 0); + int postflags = preflags | O_NONBLOCK; + fcntl(sock, F_SETFL, postflags); + int r = recv(sock, &tmp, 1, MSG_PEEK); + fcntl(sock, F_SETFL, preflags); + if (r < 0){ + if (errno == EAGAIN || errno == EWOULDBLOCK){ + return 0; + }else{ + #if DEBUG >= 2 + fprintf(stderr, "Socket ready error! Error: %s\n", strerror(errno)); + #endif + close(); + return -1; + } + } + if (r == 0){ + close(); return -1; + } + return r; +} + +/// Returns the connected-state for this socket. +/// Note that this function might be slightly behind the real situation. +/// The connection status is updated after every read/write attempt, when errors occur +/// and when the socket is closed manually. +/// \returns True if socket is connected, false otherwise. +bool Socket::Connection::connected(){ + return (sock >= 0); +} + +/// Writes data to socket. This function blocks if the socket is blocking and all data cannot be written right away. +/// If the socket is nonblocking and not all data can be written, this function sets internal variable Blocking to true +/// and returns false. +/// \param buffer Location of the buffer to write from. +/// \param len Amount of bytes to write. +/// \returns True if the whole write was succesfull, false otherwise. +bool Socket::Connection::write(const void * buffer, int len){ + int sofar = 0; + if (sock < 0){return false;} + while (sofar != len){ + int r = send(sock, (char*)buffer + sofar, len-sofar, 0); + if (r <= 0){ + Error = true; + #if DEBUG >= 2 + fprintf(stderr, "Could not write data! Error: %s\n", strerror(errno)); + #endif + close(); + return false; + }else{ + sofar += r; + } + } + return true; +}//DDv::Socket::write + + +/// Reads data from socket. This function blocks if the socket is blocking and all data cannot be read right away. +/// If the socket is nonblocking and not all data can be read, this function sets internal variable Blocking to true +/// and returns false. +/// \param buffer Location of the buffer to read to. +/// \param len Amount of bytes to read. +/// \returns True if the whole read was succesfull, false otherwise. +bool Socket::Connection::read(void * buffer, int len){ + int sofar = 0; + if (sock < 0){return false;} + while (sofar != len){ + int r = recv(sock, (char*)buffer + sofar, len-sofar, 0); + if (r < 0){ + switch (errno){ + case EWOULDBLOCK: return 0; break; + default: + Error = true; + #if DEBUG >= 2 + fprintf(stderr, "Could not read data! Error %i: %s\n", r, strerror(errno)); + #endif + close(); + break; + } + return false; + }else{ + if (r == 0){ + Error = true; + #if DEBUG >= 2 + fprintf(stderr, "Could not read data! Socket is closed.\n"); + #endif + close(); + return false; + } + sofar += r; + } + } + return true; +}//Socket::Connection::read + +/// Read call that is compatible with file access syntax. This function simply calls the other read function. +bool Socket::Connection::read(void * buffer, int width, int count){return read(buffer, width*count);} +/// Write call that is compatible with file access syntax. This function simply calls the other write function. +bool Socket::Connection::write(void * buffer, int width, int count){return write(buffer, width*count);} +/// Write call that is compatible with std::string. This function simply calls the other write function. +bool Socket::Connection::write(const std::string data){return write(data.c_str(), data.size());} + +/// Incremental write call. This function tries to write len bytes to the socket from the buffer, +/// returning the amount of bytes it actually wrote. +/// \param buffer Location of the buffer to write from. +/// \param len Amount of bytes to write. +/// \returns The amount of bytes actually written. +int Socket::Connection::iwrite(void * buffer, int len){ + if (sock < 0){return 0;} + int r = send(sock, buffer, len, 0); + if (r < 0){ + switch (errno){ + case EWOULDBLOCK: return 0; break; + default: + Error = true; + #if DEBUG >= 2 + fprintf(stderr, "Could not iwrite data! Error: %s\n", strerror(errno)); + #endif + close(); + return 0; + break; + } + } + if (r == 0){close();} + return r; +}//Socket::Connection::iwrite + +/// Incremental read call. This function tries to read len bytes to the buffer from the socket, +/// returning the amount of bytes it actually read. +/// \param buffer Location of the buffer to read to. +/// \param len Amount of bytes to read. +/// \returns The amount of bytes actually read. +int Socket::Connection::iread(void * buffer, int len){ + if (sock < 0){return 0;} + int r = recv(sock, buffer, len, 0); + if (r < 0){ + switch (errno){ + case EWOULDBLOCK: return 0; break; + default: + Error = true; + #if DEBUG >= 2 + fprintf(stderr, "Could not iread data! Error: %s\n", strerror(errno)); + #endif + close(); + return 0; + break; + } + } + if (r == 0){close();} + return r; +}//Socket::Connection::iread + +/// Read call that is compatible with std::string. +/// Data is read using iread (which is nonblocking if the Socket::Connection itself is), +/// then appended to end of buffer. This functions reads at least one byte before returning. +/// \param buffer std::string to append data to. +/// \return True if new data arrived, false otherwise. +bool Socket::Connection::read(std::string & buffer){ + char cbuffer[5000]; + if (!read(cbuffer, 1)){return false;} + int num = iread(cbuffer+1, 4999); + if (num > 0){ + buffer.append(cbuffer, num+1); + }else{ + buffer.append(cbuffer, 1); + } + return true; +}//read + +/// Read call that is compatible with std::string. +/// Data is read using iread (which is nonblocking if the Socket::Connection itself is), +/// then appended to end of buffer. +/// \param buffer std::string to append data to. +/// \return True if new data arrived, false otherwise. +bool Socket::Connection::iread(std::string & buffer){ + char cbuffer[5000]; + int num = iread(cbuffer, 5000); + if (num < 1){return false;} + buffer.append(cbuffer, num); + return true; +}//iread + +/// Incremental write call that is compatible with std::string. +/// Data is written using iwrite (which is nonblocking if the Socket::Connection itself is), +/// then removed from front of buffer. +/// \param buffer std::string to remove data from. +/// \return True if more data was sent, false otherwise. +bool Socket::Connection::iwrite(std::string & buffer){ + if (buffer.size() < 1){return false;} + int tmp = iwrite((void*)buffer.c_str(), buffer.size()); + if (tmp < 1){return false;} + buffer = buffer.substr(tmp); + return true; +}//iwrite + +/// Write call that is compatible with std::string. +/// Data is written using write (which is always blocking), +/// then removed from front of buffer. +/// \param buffer std::string to remove data from. +/// \return True if more data was sent, false otherwise. +bool Socket::Connection::swrite(std::string & buffer){ + if (buffer.size() < 1){return false;} + bool tmp = write((void*)buffer.c_str(), buffer.size()); + if (tmp){buffer = "";} + return tmp; +}//write + +/// Gets hostname for connection, if available. +std::string Socket::Connection::getHost(){ + return remotehost; +} + +/// Create a new base Server. The socket is never connected, and a placeholder for later connections. +Socket::Server::Server(){ + sock = -1; +}//Socket::Server base Constructor + +/// Create a new TCP Server. The socket is immediately bound and set to listen. +/// A maximum of 100 connections will be accepted between accept() calls. +/// Any further connections coming in will be dropped. +/// \param port The TCP port to listen on +/// \param hostname (optional) The interface to bind to. The default is 0.0.0.0 (all interfaces). +/// \param nonblock (optional) Whether accept() calls will be nonblocking. Default is false (blocking). +Socket::Server::Server(int port, std::string hostname, bool nonblock){ + sock = socket(AF_INET6, SOCK_STREAM, 0); + if (sock < 0){ + #if DEBUG >= 1 + fprintf(stderr, "Could not create socket! Error: %s\n", strerror(errno)); + #endif + return; + } + int on = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (nonblock){ + int flags = fcntl(sock, F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(sock, F_SETFL, flags); + } + struct sockaddr_in6 addr; + addr.sin6_family = AF_INET6; + addr.sin6_port = htons(port);//set port + if (hostname == "0.0.0.0"){ + addr.sin6_addr = in6addr_any; + }else{ + inet_pton(AF_INET6, hostname.c_str(), &addr.sin6_addr);//set interface, 0.0.0.0 (default) is all + } + int ret = bind(sock, (sockaddr*)&addr, sizeof(addr));//do the actual bind + if (ret == 0){ + ret = listen(sock, 100);//start listening, backlog of 100 allowed + if (ret == 0){ + return; + }else{ + #if DEBUG >= 1 + fprintf(stderr, "Listen failed! Error: %s\n", strerror(errno)); + #endif + close(); + return; + } + }else{ + #if DEBUG >= 1 + fprintf(stderr, "Binding failed! Error: %s\n", strerror(errno)); + #endif + close(); + return; + } +}//Socket::Server TCP Constructor + +/// Create a new Unix Server. The socket is immediately bound and set to listen. +/// A maximum of 100 connections will be accepted between accept() calls. +/// Any further connections coming in will be dropped. +/// The address used will first be unlinked - so it succeeds if the Unix socket already existed. Watch out for this behaviour - it will delete any file located at address! +/// \param address The location of the Unix socket to bind to. +/// \param nonblock (optional) Whether accept() calls will be nonblocking. Default is false (blocking). +Socket::Server::Server(std::string address, bool nonblock){ + unlink(address.c_str()); + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0){ + #if DEBUG >= 1 + fprintf(stderr, "Could not create socket! Error: %s\n", strerror(errno)); + #endif + return; + } + if (nonblock){ + int flags = fcntl(sock, F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(sock, F_SETFL, flags); + } + sockaddr_un addr; + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, address.c_str(), address.size()+1); + int ret = bind(sock, (sockaddr*)&addr, sizeof(addr)); + if (ret == 0){ + ret = listen(sock, 100);//start listening, backlog of 100 allowed + if (ret == 0){ + return; + }else{ + #if DEBUG >= 1 + fprintf(stderr, "Listen failed! Error: %s\n", strerror(errno)); + #endif + close(); + return; + } + }else{ + #if DEBUG >= 1 + fprintf(stderr, "Binding failed! Error: %s\n", strerror(errno)); + #endif + close(); + return; + } +}//Socket::Server Unix Constructor + +/// Accept any waiting connections. If the Socket::Server is blocking, this function will block until there is an incoming connection. +/// If the Socket::Server is nonblocking, it might return a Socket::Connection that is not connected, so check for this. +/// \param nonblock (optional) Whether the newly connected socket should be nonblocking. Default is false (blocking). +/// \returns A Socket::Connection, which may or may not be connected, depending on settings and circumstances. +Socket::Connection Socket::Server::accept(bool nonblock){ + if (sock < 0){return Socket::Connection(-1);} + struct sockaddr_in6 addrinfo; + socklen_t len = sizeof(addrinfo); + static char addrconv[INET6_ADDRSTRLEN]; + int r = ::accept(sock, (sockaddr*)&addrinfo, &len); + //set the socket to be nonblocking, if requested. + //we could do this through accept4 with a flag, but that call is non-standard... + if ((r >= 0) && nonblock){ + int flags = fcntl(r, F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(r, F_SETFL, flags); + } + Socket::Connection tmp(r); + if (r < 0){ + if (errno != EWOULDBLOCK && errno != EAGAIN){close();} + }else{ + if (addrinfo.sin6_family == AF_INET6){ + tmp.remotehost = inet_ntop(AF_INET6, &(addrinfo.sin6_addr), addrconv, INET6_ADDRSTRLEN); + #if DEBUG >= 4 + printf("IPv6 addr: %s\n", tmp.remotehost.c_str()); + #endif + } + if (addrinfo.sin6_family == AF_INET){ + tmp.remotehost = inet_ntop(AF_INET, &(((sockaddr_in*)&addrinfo)->sin_addr), addrconv, INET6_ADDRSTRLEN); + #if DEBUG >= 4 + printf("IPv4 addr: %s\n", tmp.remotehost.c_str()); + #endif + } + if (addrinfo.sin6_family == AF_UNIX){ + #if DEBUG >= 4 + tmp.remotehost = ((sockaddr_un*)&addrinfo)->sun_path; + printf("Unix addr: %s\n", tmp.remotehost.c_str()); + #endif + tmp.remotehost = "UNIX_SOCKET"; + } + } + return tmp; +} + +/// Close connection. The internal socket is closed and then set to -1. +void Socket::Server::close(){ + shutdown(sock, SHUT_RDWR); + ::close(sock); + sock = -1; +}//Socket::Server::close + +/// Returns the connected-state for this socket. +/// Note that this function might be slightly behind the real situation. +/// The connection status is updated after every accept attempt, when errors occur +/// and when the socket is closed manually. +/// \returns True if socket is connected, false otherwise. +bool Socket::Server::connected(){ + return (sock >= 0); +}//Socket::Server::connected + +/// Returns internal socket number. +int Socket::Server::getSocket(){return sock;} diff --git a/util/socket.h b/util/socket.h new file mode 100644 index 00000000..2b982563 --- /dev/null +++ b/util/socket.h @@ -0,0 +1,65 @@ +/// \file socket.h +/// A handy Socket wrapper library. +/// Written by Jaron Vietor in 2010 for DDVTech + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +///Holds Socket tools. +namespace Socket{ + + /// This class is for easy communicating through sockets, either TCP or Unix. + class Connection{ + private: + int sock; ///< Internally saved socket number. + std::string remotehost; ///< Stores remote host address. + public: + Connection(); ///< Create a new disconnected base socket. + Connection(int sockNo); ///< Create a new base socket. + Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket. + bool Error; ///< Set to true if a socket error happened. + bool Blocking; ///< Set to true if a socket is currently or wants to be blocking. + signed int ready(); ///< Returns the ready-state for this socket. + bool connected(); ///< Returns the connected-state for this socket. + bool read(void * buffer, int len); ///< Reads data from socket. + bool read(void * buffer, int width, int count); ///< Read call that is compatible with file access syntax. + bool write(const void * buffer, int len); ///< Writes data to socket. + bool write(void * buffer, int width, int count); ///< Write call that is compatible with file access syntax. + bool write(const std::string data); ///< Write call that is compatible with std::string. + int iwrite(void * buffer, int len); ///< Incremental write call. + int iread(void * buffer, int len); ///< Incremental read call. + bool read(std::string & buffer); ///< Read call that is compatible with std::string. + bool swrite(std::string & buffer); ///< Read call that is compatible with std::string. + bool iread(std::string & buffer); ///< Incremental write call that is compatible with std::string. + bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string. + void close(); ///< Close connection. + std::string getHost(); ///< Gets hostname for connection, if available. + int getSocket(); ///< Returns internal socket number. + friend class Server; + }; + + /// This class is for easily setting up listening socket, either TCP or Unix. + class Server{ + private: + int sock; ///< Internally saved socket number. + public: + Server(); ///< Create a new base Server. + Server(int port, std::string hostname = "0.0.0.0", bool nonblock = false); ///< Create a new TCP Server. + Server(std::string adres, bool nonblock = false); ///< Create a new Unix Server. + Connection accept(bool nonblock = false); ///< Accept any waiting connections. + bool connected(); ///< Returns the connected-state for this socket. + void close(); ///< Close connection. + int getSocket(); ///< Returns internal socket number. + }; + +}; From 73e0abb40ca1d999cb4ee0b9addcf1e0aa72636c Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 22 Jun 2011 00:14:31 +0200 Subject: [PATCH 2/5] Attempted RTMP fix --- Connector_RTMP/main.cpp | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 88ff46cf..59da9c73 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -279,22 +279,24 @@ void Connector_RTMP::parseChunk(){ if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} #endif Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) + Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); amfreply.addContent(AMF::Object("", "_result"));//result success amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(AMF::Object(""));//server properties - amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,5,4,1004")); - amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)127)); - amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1)); + amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,0,1,123")); + amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31)); + //amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1)); amfreply.addContent(AMF::Object(""));//info amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); amfreply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success")); amfreply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded.")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", 1337)); amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding)); - amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); - amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); + //amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); + //amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); #if DEBUG >= 4 amfreply.Print(); #endif @@ -302,7 +304,7 @@ void Connector_RTMP::parseChunk(){ //send onBWDone packet - no clue what it is, but real server sends it... amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); amfreply.addContent(AMF::Object("", "onBWDone"));//result - amfreply.addContent(AMF::Object("", (double)0));//zero + amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); parsed3 = true; @@ -431,7 +433,10 @@ void Connector_RTMP::parseChunk(){ if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} #endif Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) + Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send window acknowledgement size (msg 5) Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + RTMPStream::chunk_snd_max = 4096; + Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); amfreply.addContent(AMF::Object("", "_result"));//result success @@ -518,7 +523,7 @@ void Connector_RTMP::parseChunk(){ amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset")); amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting...")); amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1)); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); #if DEBUG >= 4 amfreply.Print(); #endif @@ -531,8 +536,7 @@ void Connector_RTMP::parseChunk(){ amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start")); amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1)); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); #if DEBUG >= 4 amfreply.Print(); #endif From c264b6f91228d26319c91348b788007a55fd44d0 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 22 Jun 2011 02:18:58 +0200 Subject: [PATCH 3/5] Attempted RTMP fix 2 --- util/rtmpchunks.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/util/rtmpchunks.cpp b/util/rtmpchunks.cpp index a63a2380..9ab9aaa7 100644 --- a/util/rtmpchunks.cpp +++ b/util/rtmpchunks.cpp @@ -42,7 +42,7 @@ std::string RTMPStream::Chunk::Pack(){ unsigned int tmpi; unsigned char chtype = 0x00; timestamp -= firsttime; - if (prev.cs_id == cs_id){ + if ((prev.msg_type_id > 0) && (prev.cs_id == cs_id)){ if (msg_stream_id == prev.msg_stream_id){ chtype = 0x40;//do not send msg_stream_id if (len == prev.len){ @@ -292,6 +292,7 @@ bool RTMPStream::Chunk::Parse(std::string & indata){ break; case 0x40: if (indata.size() < i+7) return false; //can't read whole header + if (prev.msg_type_id == 0){fprintf(stderr, "Warning: Header type 0x40 with no valid previous chunk!\n");} timestamp = indata[i++]*256*256; timestamp += indata[i++]*256; timestamp += indata[i++]; @@ -305,6 +306,7 @@ bool RTMPStream::Chunk::Parse(std::string & indata){ break; case 0x80: if (indata.size() < i+3) return false; //can't read whole header + if (prev.msg_type_id == 0){fprintf(stderr, "Warning: Header type 0x80 with no valid previous chunk!\n");} timestamp = indata[i++]*256*256; timestamp += indata[i++]*256; timestamp += indata[i++]; @@ -315,6 +317,7 @@ bool RTMPStream::Chunk::Parse(std::string & indata){ msg_stream_id = prev.msg_stream_id; break; case 0xC0: + if (prev.msg_type_id == 0){fprintf(stderr, "Warning: Header type 0xC0 with no valid previous chunk!\n");} timestamp = prev.timestamp; len = prev.len; len_left = prev.len_left; From e6c0efaa13b7b2d11b7cf58b26d79812f3345b62 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 22 Jun 2011 02:20:49 +0200 Subject: [PATCH 4/5] RTMP Parser was not in git - added --- RTMP_Parser/Makefile | 20 +++++++ RTMP_Parser/main.cpp | 123 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100644 RTMP_Parser/Makefile create mode 100644 RTMP_Parser/main.cpp diff --git a/RTMP_Parser/Makefile b/RTMP_Parser/Makefile new file mode 100644 index 00000000..95aa3b19 --- /dev/null +++ b/RTMP_Parser/Makefile @@ -0,0 +1,20 @@ +SRC = main.cpp ../util/amf.cpp ../util/rtmpchunks.cpp ../util/crypto.cpp +OBJ = $(SRC:.cpp=.o) +OUT = RTMP_Parser +INCLUDES = +STATIC = +CCFLAGS = -Wall -Wextra -funsigned-char -g +CC = $(CROSS)g++ +LD = $(CROSS)ld +AR = $(CROSS)ar +LIBS = -lssl -lcrypto +.SUFFIXES: .cpp +.PHONY: clean default +default: $(OUT) +.cpp.o: + $(CC) $(INCLUDES) $(CCFLAGS) -c $< -o $@ +$(OUT): $(OBJ) + $(CC) -o $(OUT) $(OBJ) $(STATIC) $(LIBS) +clean: + rm -rf $(OBJ) $(OUT) Makefile.bak *~ + diff --git a/RTMP_Parser/main.cpp b/RTMP_Parser/main.cpp new file mode 100644 index 00000000..665d1621 --- /dev/null +++ b/RTMP_Parser/main.cpp @@ -0,0 +1,123 @@ +/// \file RTMP_Parser/main.cpp +/// Debugging tool for RTMP data. +/// Expects RTMP data of one side of the conversion through stdin, outputs human-readable information to stderr. +/// Automatically skips 3073 bytes of handshake data. + +#define DEBUG 10 //maximum debugging level +#include +#include +#include +#include +#include "../util/amf.h" +#include "../util/rtmpchunks.h" + +/// Debugging tool for RTMP data. +/// Expects RTMP data of one side of the conversion through stdin, outputs human-readable information to stderr. +/// Will output FLV file to stdout, if available +/// Automatically skips 3073 bytes of handshake data. +int main(){ + + std::string inbuffer; + while (std::cin.good()){inbuffer += std::cin.get();}//read all of std::cin to temp + inbuffer.erase(0, 3073);//strip the handshake part + RTMPStream::Chunk next; + AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER); + + + while (next.Parse(inbuffer)){ + switch (next.msg_type_id){ + case 0://does not exist + fprintf(stderr, "Error chunk - %i, %i, %i, %i, %i\n", next.cs_id, next.timestamp, next.real_len, next.len_left, next.msg_stream_id); + //return 0; + break;//happens when connection breaks unexpectedly + case 1://set chunk size + RTMPStream::chunk_rec_max = ntohl(*(int*)next.data.c_str()); + fprintf(stderr, "CTRL: Set chunk size: %i\n", RTMPStream::chunk_rec_max); + break; + case 2://abort message - we ignore this one + fprintf(stderr, "CTRL: Abort message: %i\n", ntohl(*(int*)next.data.c_str())); + //4 bytes of stream id to drop + break; + case 3://ack + RTMPStream::snd_window_at = ntohl(*(int*)next.data.c_str()); + fprintf(stderr, "CTRL: Acknowledgement: %i\n", RTMPStream::snd_window_at); + break; + case 4:{ + short int ucmtype = ntohs(*(short int*)next.data.c_str()); + switch (ucmtype){ + case 0: + fprintf(stderr, "CTRL: User control message: stream begin %i\n", ntohl(*(int*)next.data.c_str()+2)); + break; + case 1: + fprintf(stderr, "CTRL: User control message: stream EOF %i\n", ntohl(*(int*)next.data.c_str()+2)); + break; + case 2: + fprintf(stderr, "CTRL: User control message: stream dry %i\n", ntohl(*(int*)next.data.c_str()+2)); + break; + case 3: + fprintf(stderr, "CTRL: User control message: setbufferlen %i\n", ntohl(*(int*)next.data.c_str()+2)); + break; + case 4: + fprintf(stderr, "CTRL: User control message: streamisrecorded %i\n", ntohl(*(int*)next.data.c_str()+2)); + break; + case 6: + fprintf(stderr, "CTRL: User control message: pingrequest %i\n", ntohl(*(int*)next.data.c_str()+2)); + break; + case 7: + fprintf(stderr, "CTRL: User control message: pingresponse %i\n", ntohl(*(int*)next.data.c_str()+2)); + break; + default: + fprintf(stderr, "CTRL: User control message: UNKNOWN %hi - %i\n", ucmtype, ntohl(*(int*)next.data.c_str()+2)); + break; + } + } break; + case 5://window size of other end + RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str()); + RTMPStream::rec_window_at = RTMPStream::rec_cnt; + fprintf(stderr, "CTRL: Window size: %i\n", RTMPStream::rec_window_size); + break; + case 6: + RTMPStream::snd_window_size = ntohl(*(int*)next.data.c_str()); + //4 bytes window size, 1 byte limit type (ignored) + fprintf(stderr, "CTRL: Set peer bandwidth: %i\n", RTMPStream::snd_window_size); + break; + case 8: + fprintf(stderr, "Received %i bytes audio data\n", next.len); + break; + case 9: + fprintf(stderr, "Received %i bytes video data\n", next.len); + break; + case 15: + fprintf(stderr, "Received AFM3 data message\n"); + break; + case 16: + fprintf(stderr, "Received AFM3 shared object\n"); + break; + case 17: + fprintf(stderr, "Received AFM3 command message\n"); + break; + case 18:{ + fprintf(stderr, "Received AFM0 data message (metadata):\n"); + amfdata = AMF::parse(next.data); + amfdata.Print(); + } break; + case 19: + fprintf(stderr, "Received AFM0 shared object\n"); + break; + case 20:{//AMF0 command message + fprintf(stderr, "Received AFM0 command message:\n"); + amfdata = AMF::parse(next.data); + amfdata.Print(); + } break; + case 22: + fprintf(stderr, "Received aggregate message\n"); + break; + default: + fprintf(stderr, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.\n"); + return 1; + break; + }//switch for type of chunk + }//while chunk parsed + fprintf(stderr, "No more readable data\n"); + return 0; +}//main \ No newline at end of file From c0717e24a37cd3ab45ca4c76512d1fd2e167ff31 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 22 Jun 2011 02:38:11 +0200 Subject: [PATCH 5/5] More fixing attempts... --- Connector_RTMP/main.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 59da9c73..5e34521e 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -279,7 +279,7 @@ void Connector_RTMP::parseChunk(){ if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} #endif Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) - Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) + Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send window acknowledgement size (msg 5) Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); @@ -432,27 +432,27 @@ void Connector_RTMP::parseChunk(){ if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} #endif - Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) - Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send window acknowledgement size (msg 5) - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 RTMPStream::chunk_snd_max = 4096; Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) + Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) + Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send rec window acknowledgement size (msg 6) + Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); amfreply.addContent(AMF::Object("", "_result"));//result success amfreply.addContent(amfdata.getContent(1));//same transaction ID - // amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info amfreply.addContent(AMF::Object(""));//server properties - amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,5,4,1004")); - amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)127)); - amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1)); + amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,0,1,123")); + amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31)); + //amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1)); amfreply.addContent(AMF::Object(""));//info amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); amfreply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success")); amfreply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded.")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", 1337)); amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding)); - amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); - amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); + //amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); + //amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); #if DEBUG >= 4 amfreply.Print(); #endif