diff --git a/lib/flv_tag.cpp b/lib/flv_tag.cpp index 2c005dc1..89694a3f 100644 --- a/lib/flv_tag.cpp +++ b/lib/flv_tag.cpp @@ -627,79 +627,6 @@ bool FLV::Tag::MemLoader(char * D, unsigned int S, unsigned int & P){ }//Tag::MemLoader -/// Helper function for FLV::SockLoader. -/// This function will try to read count bytes from socket sock into buffer. -/// This function should be called repeatedly until true. -/// \param buffer The target buffer. -/// \param count Amount of bytes to read. -/// \param sofar Current amount read. -/// \param sock Socket to read from. -/// \return True if count bytes are read succesfully, false otherwise. -bool FLV::Tag::SockReadUntil(char * buffer, unsigned int count, unsigned int & sofar, Socket::Connection & sock){ - if (sofar >= count){return true;} - int r = 0; - r = sock.iread(buffer + sofar,count-sofar); - sofar += r; - if (sofar >= count){return true;} - return false; -}//Tag::SockReadUntil - -/// Try to load a tag from a socket. -/// This is a stateful function - if fed incorrect data, it will most likely never return true again! -/// While this function returns false, the Tag might not contain valid data. -/// \param sock The socket to read from. -/// \return True if a whole tag is succesfully read, false otherwise. -bool FLV::Tag::SockLoader(Socket::Connection sock){ - if (buf < 15){data = (char*)realloc(data, 15); buf = 15;} - if (done){ - if (SockReadUntil(data, 11, sofar, sock)){ - //if its a correct FLV header, throw away and read tag header - if (FLV::is_header(data)){ - if (SockReadUntil(data, 13, sofar, sock)){ - if (FLV::check_header(data)){ - sofar = 0; - memcpy(FLV::Header, data, 13); - }else{FLV::Parse_Error = true; Error_Str = "Invalid header received."; return false;} - } - }else{ - //if a tag header, calculate length and read tag body - len = data[3] + 15; - len += (data[2] << 8); - len += (data[1] << 16); - if (buf < len){data = (char*)realloc(data, len); buf = len;} - if (data[0] > 0x12){ - data[0] += 32; - FLV::Parse_Error = true; - Error_Str = "Invalid Tag received ("; - Error_Str += data[0]; - Error_Str += ")."; - return false; - } - done = false; - } - } - }else{ - //read tag body - if (SockReadUntil(data, len, sofar, sock)){ - //calculate keyframeness, next time read header again, return true - if ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)){isKeyframe = true;}else{isKeyframe = false;} - done = true; - sofar = 0; - return true; - } - } - return false; -}//Tag::SockLoader - -/// Try to load a tag from a socket. -/// This is a stateful function - if fed incorrect data, it will most likely never return true again! -/// While this function returns false, the Tag might not contain valid data. -/// \param sock The socket to read from. -/// \return True if a whole tag is succesfully read, false otherwise. -bool FLV::Tag::SockLoader(int sock){ - return SockLoader(Socket::Connection(sock)); -}//Tag::SockLoader - /// Helper function for FLV::FileLoader. /// This function will try to read count bytes from file f into buffer. /// This function should be called repeatedly until true. diff --git a/lib/flv_tag.h b/lib/flv_tag.h index 4e61fc6a..8de48eb5 100644 --- a/lib/flv_tag.h +++ b/lib/flv_tag.h @@ -46,8 +46,6 @@ namespace FLV { bool DTSCMetaInit(DTSC::Stream & S); JSON::Value toJSON(JSON::Value & metadata); bool MemLoader(char * D, unsigned int S, unsigned int & P); - bool SockLoader(int sock); - bool SockLoader(Socket::Connection sock); bool FileLoader(FILE * f); protected: int buf; ///< Maximum length of buffer space. @@ -56,7 +54,6 @@ namespace FLV { void setLen(); //loader helper functions bool MemReadUntil(char * buffer, unsigned int count, unsigned int & sofar, char * D, unsigned int S, unsigned int & P); - bool SockReadUntil(char * buffer, unsigned int count, unsigned int & sofar, Socket::Connection & sock); bool FileReadUntil(char * buffer, unsigned int count, unsigned int & sofar, FILE * f); //JSON writer helpers void Meta_Put(JSON::Value & meta, std::string cat, std::string elem, std::string val); diff --git a/lib/socket.cpp b/lib/socket.cpp index 518c77db..9a230118 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -160,55 +160,6 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ } }//Socket::Connection TCP Contructor -/// Calls poll() on the socket, checking if data is available. -/// This function may return true even if there is no data, but never returns false when there is. -bool Socket::Connection::canRead(){ - struct pollfd PFD; - PFD.fd = sock; - PFD.events = POLLIN; - PFD.revents = 0; - poll(&PFD, 1, 5); - return (PFD.revents & POLLIN) == POLLIN; -} -/// Calls poll() on the socket, checking if data can be written. -bool Socket::Connection::canWrite(){ - struct pollfd PFD; - PFD.fd = sock; - PFD.events = POLLOUT; - PFD.revents = 0; - poll(&PFD, 1, 5); - return (PFD.revents & POLLOUT) == POLLOUT; -} - - -/// Returns the ready-state for this socket. -/// \returns 1 if data is waiting to be read, -1 if not connected, 0 otherwise. -signed int Socket::Connection::ready(){ - if (sock < 0) return -1; - char tmp; - int preflags = fcntl(sock, F_GETFL, 0); - int postflags = preflags | O_NONBLOCK; - fcntl(sock, F_SETFL, postflags); - int r = recv(sock, &tmp, 1, MSG_PEEK); - fcntl(sock, F_SETFL, preflags); - if (r < 0){ - if (errno == EAGAIN || errno == EWOULDBLOCK){ - return 0; - }else{ - #if DEBUG >= 2 - fprintf(stderr, "Socket ready error! Error: %s\n", strerror(errno)); - #endif - close(); - return -1; - } - } - if (r == 0){ - close(); - return -1; - } - return r; -} - /// Returns the connected-state for this socket. /// Note that this function might be slightly behind the real situation. /// The connection status is updated after every read/write attempt, when errors occur @@ -241,6 +192,17 @@ bool Socket::Connection::spool(){ return iread(downbuffer); } +/// Updates the downbuffer and upbuffer internal variables until upbuffer is empty. +/// Returns true if new data was received, false otherwise. +bool Socket::Connection::flush(){ + while (upbuffer.size() > 0 && connected()){ + iwrite(upbuffer); + usleep(5000);//sleep 5 ms + } + return iread(downbuffer); +} + + /// Returns a reference to the download buffer. std::string & Socket::Connection::Received(){ return downbuffer; @@ -251,81 +213,6 @@ void Socket::Connection::Send(std::string data){ upbuffer.append(data); } -/// Writes data to socket. This function blocks if the socket is blocking and all data cannot be written right away. -/// If the socket is nonblocking and not all data can be written, this function sets internal variable Blocking to true -/// and returns false. -/// \param buffer Location of the buffer to write from. -/// \param len Amount of bytes to write. -/// \returns True if the whole write was succesfull, false otherwise. -bool Socket::Connection::write(const void * buffer, int len){ - int sofar = 0; - if (sock < 0){return false;} - while (sofar != len){ - int r = send(sock, (char*)buffer + sofar, len-sofar, 0); - if (r <= 0){ - Error = true; - #if DEBUG >= 2 - fprintf(stderr, "Could not write data! Error: %s\n", strerror(errno)); - #endif - close(); - up += sofar; - return false; - }else{ - sofar += r; - } - } - up += sofar; - return true; -}//DDv::Socket::write - -/// Reads data from socket. This function blocks if the socket is blocking and all data cannot be read right away. -/// If the socket is nonblocking and not all data can be read, this function sets internal variable Blocking to true -/// and returns false. -/// \param buffer Location of the buffer to read to. -/// \param len Amount of bytes to read. -/// \returns True if the whole read was succesfull, false otherwise. -bool Socket::Connection::read(const void * buffer, int len){ - int sofar = 0; - if (sock < 0){return false;} - while (sofar != len){ - int r = recv(sock, (char*)buffer + sofar, len-sofar, 0); - if (r < 0){ - switch (errno){ - case EWOULDBLOCK: - down += sofar; - return 0; - break; - default: - Error = true; - #if DEBUG >= 2 - fprintf(stderr, "Could not read data! Error %i: %s\n", r, strerror(errno)); - #endif - close(); - down += sofar; - break; - } - return false; - }else{ - if (r == 0){ - Error = true; - close(); - down += sofar; - return false; - } - sofar += r; - } - } - down += sofar; - return true; -}//Socket::Connection::read - -/// Read call that is compatible with file access syntax. This function simply calls the other read function. -bool Socket::Connection::read(const void * buffer, int width, int count){return read(buffer, width*count);} -/// Write call that is compatible with file access syntax. This function simply calls the other write function. -bool Socket::Connection::write(const void * buffer, int width, int count){return write(buffer, width*count);} -/// Write call that is compatible with std::string. This function simply calls the other write function. -bool Socket::Connection::write(const std::string data){return write(data.c_str(), data.size());} - /// Incremental write call. This function tries to write len bytes to the socket from the buffer, /// returning the amount of bytes it actually wrote. /// \param buffer Location of the buffer to write from. @@ -336,12 +223,16 @@ int Socket::Connection::iwrite(const void * buffer, int len){ int r = send(sock, buffer, len, 0); if (r < 0){ switch (errno){ - case EWOULDBLOCK: return 0; break; + case EWOULDBLOCK: + return 0; + break; default: - Error = true; - #if DEBUG >= 2 - fprintf(stderr, "Could not iwrite data! Error: %s\n", strerror(errno)); - #endif + if (errno != EPIPE){ + Error = true; + #if DEBUG >= 2 + fprintf(stderr, "Could not iwrite data! Error: %s\n", strerror(errno)); + #endif + } close(); return 0; break; @@ -364,12 +255,16 @@ int Socket::Connection::iread(void * buffer, int len){ int r = recv(sock, buffer, len, 0); if (r < 0){ switch (errno){ - case EWOULDBLOCK: return 0; break; + case EWOULDBLOCK: + return 0; + break; default: - Error = true; - #if DEBUG >= 2 - fprintf(stderr, "Could not iread data! Error: %s\n", strerror(errno)); - #endif + if (errno != EPIPE){ + Error = true; + #if DEBUG >= 2 + fprintf(stderr, "Could not iread data! Error: %s\n", strerror(errno)); + #endif + } close(); return 0; break; @@ -382,23 +277,6 @@ int Socket::Connection::iread(void * buffer, int len){ return r; }//Socket::Connection::iread -/// Read call that is compatible with std::string. -/// Data is read using iread (which is nonblocking if the Socket::Connection itself is), -/// then appended to end of buffer. This functions reads at least one byte before returning. -/// \param buffer std::string to append data to. -/// \return True if new data arrived, false otherwise. -bool Socket::Connection::read(std::string & buffer){ - char cbuffer[5000]; - if (!read(cbuffer, 1)){return false;} - int num = iread(cbuffer+1, 4999); - if (num > 0){ - buffer.append(cbuffer, num+1); - }else{ - buffer.append(cbuffer, 1); - } - return true; -}//read - /// Read call that is compatible with std::string. /// Data is read using iread (which is nonblocking if the Socket::Connection itself is), /// then appended to end of buffer. @@ -425,18 +303,6 @@ bool Socket::Connection::iwrite(std::string & buffer){ return true; }//iwrite -/// Write call that is compatible with std::string. -/// Data is written using write (which is always blocking), -/// then removed from front of buffer. -/// \param buffer std::string to remove data from. -/// \return True if more data was sent, false otherwise. -bool Socket::Connection::swrite(std::string & buffer){ - if (buffer.size() < 1){return false;} - bool tmp = write((void*)buffer.c_str(), buffer.size()); - if (tmp){buffer = "";} - return tmp; -}//write - /// Gets hostname for connection, if available. std::string Socket::Connection::getHost(){ return remotehost; diff --git a/lib/socket.h b/lib/socket.h index 2a4f0418..db902c66 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -14,6 +14,8 @@ #include #include +//for being friendly with Socket::Connection down below +namespace Buffer{class user;}; ///Holds Socket tools. namespace Socket{ @@ -28,35 +30,32 @@ namespace Socket{ unsigned int conntime; std::string downbuffer; ///< Stores temporary data coming in. std::string upbuffer; ///< Stores temporary data going out. + int iread(void * buffer, int len); ///< Incremental read call. + int iwrite(const void * buffer, int len); ///< Incremental write call. + bool iread(std::string & buffer); ///< Incremental write call that is compatible with std::string. + bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string. public: + //friends + friend class Buffer::user; + //constructors Connection(); ///< Create a new disconnected base socket. Connection(int sockNo); ///< Create a new base socket. Connection(std::string hostname, int port, bool nonblock); ///< Create a new TCP socket. Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket. - void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false). - bool canRead(); ///< Calls poll() on the socket, checking if data is available. - bool canWrite(); ///< Calls poll() on the socket, checking if data can be written. - signed int ready(); ///< Returns the ready-state for this socket. - bool connected() const; ///< Returns the connected-state for this socket. - bool read(const void * buffer, int len); ///< Reads data from socket. - bool read(const void * buffer, int width, int count); ///< Read call that is compatible with file access syntax. - bool write(const void * buffer, int len); ///< Writes data to socket. - bool write(const void * buffer, int width, int count); ///< Write call that is compatible with file access syntax. - bool write(const std::string data); ///< Write call that is compatible with std::string. - int iwrite(const void * buffer, int len); ///< Incremental write call. - int iread(void * buffer, int len); ///< Incremental read call. - bool read(std::string & buffer); ///< Read call that is compatible with std::string. - bool swrite(std::string & buffer); ///< Write call that is compatible with std::string. - bool iread(std::string & buffer); ///< Incremental write call that is compatible with std::string. - bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string. - bool spool(); ///< Updates the downbuffer and upbuffer internal variables. - std::string & Received(); ///< Returns a reference to the download buffer. - void Send(std::string data); ///< Appends data to the upbuffer. + //generic methods void close(); ///< Close connection. + void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false). std::string getHost(); ///< Gets hostname for connection, if available. void setHost(std::string host); ///< Sets hostname for connection manually. int getSocket(); ///< Returns internal socket number. std::string getError(); ///< Returns a string describing the last error that occured. + bool connected() const; ///< Returns the connected-state for this socket. + //buffered i/o methods + bool spool(); ///< Updates the downbuffer and upbuffer internal variables. + bool flush(); ///< Updates the downbuffer and upbuffer internal variables until upbuffer is empty. + std::string & Received(); ///< Returns a reference to the download buffer. + void Send(std::string data); ///< Appends data to the upbuffer. + //stats related methods unsigned int dataUp(); ///< Returns total amount of bytes sent. unsigned int dataDown(); ///< Returns total amount of bytes received. std::string getStats(std::string C); ///< Returns a std::string of stats, ended by a newline.