diff --git a/Buffer/main.cpp b/Buffer/main.cpp index c7c57ef9..a458cbd1 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -20,6 +20,20 @@ /// Holds all code unique to the Buffer. namespace Buffer{ + class user;//forward declaration + JSON::Value Storage; ///< Global storage of data. + DTSC::Stream * Strm = 0; + std::string waiting_ip = ""; ///< IP address for media push. + Socket::Connection ip_input; ///< Connection used for media push. + tthread::mutex stats_mutex; ///< Mutex for stats modifications. + tthread::mutex transfer_mutex; ///< Mutex for data transfers. + tthread::mutex socket_mutex; ///< Mutex for user deletion/work. + bool buffer_running = true; ///< Set to false when shutting down. + std::vector users; ///< All connected users. + std::vector::iterator usersIt; ///< Iterator for all connected users. + std::string name; ///< Name for this buffer. + tthread::condition_variable moreData; ///< Triggered when more data becomes available. + /// Gets the current system time in milliseconds. unsigned int getNowMS(){ timeval t; @@ -28,146 +42,50 @@ namespace Buffer{ }//getNowMS - JSON::Value Storage; ///< Global storage of data. - ///A simple signal handler that ignores all signals. void termination_handler (int signum){ switch (signum){ + case SIGKILL: buffer_running = false; break; case SIGPIPE: return; break; default: return; break; } } +} - DTSC::Stream * Strm = 0; - std::string waiting_ip = ""; ///< IP address for media push. - Socket::Connection ip_input; ///< Connection used for media push. - - /// Converts a stats line to up, down, host, connector and conntime values. - class Stats{ - public: - unsigned int up; - unsigned int down; - std::string host; - std::string connector; - unsigned int conntime; - Stats(){ - up = 0; - down = 0; - conntime = 0; - } - Stats(std::string s){ - size_t f = s.find(' '); - if (f != std::string::npos){ - host = s.substr(0, f); - s.erase(0, f+1); - } - f = s.find(' '); - if (f != std::string::npos){ - connector = s.substr(0, f); - s.erase(0, f+1); - } - f = s.find(' '); - if (f != std::string::npos){ - conntime = atoi(s.substr(0, f).c_str()); - s.erase(0, f+1); - } - f = s.find(' '); - if (f != std::string::npos){ - up = atoi(s.substr(0, f).c_str()); - s.erase(0, f+1); - down = atoi(s.c_str()); +#include "stats.cpp" +#include "user.cpp" + +namespace Buffer{ + void handleStats(void * empty){ + if (empty != 0){return;} + Socket::Connection StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); + while (buffer_running){ + usleep(1000000); //sleep one second + unsigned int now = time(0); + unsigned int tot_up = 0, tot_down = 0, tot_count = 0; + stats_mutex.lock(); + if (users.size() > 0){ + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + tot_down += usersIt->curr_down; + tot_up += usersIt->curr_up; + tot_count++; } } - }; - - /// Holds connected users. - /// Keeps track of what buffer users are using and the connection status. - class user{ - public: - tthread::thread * Thread; ///< Holds the thread dealing with this user. - DTSC::Ring * myRing; ///< Ring of the buffer for this user. - int MyNum; ///< User ID of this user. - std::string MyStr; ///< User ID of this user as a string. - int currsend; ///< Current amount of bytes sent. - Stats lastStats; ///< Holds last known stats for this connection. - unsigned int curr_up; ///< Holds the current estimated transfer speed up. - unsigned int curr_down; ///< Holds the current estimated transfer speed down. - bool gotproperaudio; ///< Whether the user received proper audio yet. - void * lastpointer; ///< Pointer to data part of current buffer. - static int UserCount; ///< Global user counter. - Socket::Connection S; ///< Connection to user - /// Creates a new user from a newly connected socket. - /// Also prints "User connected" text to stdout. - user(Socket::Connection fd){ - S = fd; - MyNum = UserCount++; - std::stringstream st; - st << MyNum; - MyStr = st.str(); - curr_up = 0; - curr_down = 0; - currsend = 0; - myRing = 0; - Thread = 0; - std::cout << "User " << MyNum << " connected" << std::endl; - }//constructor - /// Drops held DTSC::Ring class, if one is held. - ~user(){ - Strm->dropRing(myRing); - }//destructor - /// Disconnects the current user. Doesn't do anything if already disconnected. - /// Prints "Disconnected user" to stdout if disconnect took place. - void Disconnect(std::string reason) { - if (S.connected()){S.close();} - if (Thread != 0){ - if (Thread->joinable()){Thread->join();} - Thread = 0; - } - Storage["curr"].removeMember(MyStr); - Storage["log"][MyStr]["connector"] = lastStats.connector; - Storage["log"][MyStr]["up"] = lastStats.up; - Storage["log"][MyStr]["down"] = lastStats.down; - Storage["log"][MyStr]["conntime"] = lastStats.conntime; - Storage["log"][MyStr]["host"] = lastStats.host; - Storage["log"][MyStr]["start"] = (unsigned int)time(0) - lastStats.conntime; - std::cout << "Disconnected user " << MyStr << ": " << reason << ". " << lastStats.connector << " transferred " << lastStats.up << " up and " << lastStats.down << " down in " << lastStats.conntime << " seconds to " << lastStats.host << std::endl; - }//Disconnect - /// Tries to send the current buffer, returns true if success, false otherwise. - /// Has a side effect of dropping the connection if send will never complete. - bool doSend(const char * ptr, int len){ - int r = S.iwrite(ptr+currsend, len-currsend); - if (r <= 0){ - if (errno == EWOULDBLOCK){return false;} - Disconnect(S.getError()); - return false; - } - currsend += r; - return (currsend == len); - }//doSend - /// Try to send data to this user. Disconnects if any problems occur. - void Send(){ - if (!myRing){return;}//no ring! - if (!S.connected()){return;}//cancel if not connected - if (myRing->waiting){return;}//still waiting for next buffer? - - if (myRing->starved){ - //if corrupt data, warn and get new DTSC::Ring - std::cout << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl; - Strm->dropRing(myRing); - myRing = Strm->getRing(); - } - currsend = 0; - - //try to complete a send - if (doSend(Strm->outPacket(myRing->b).c_str(), Strm->outPacket(myRing->b).length())){ - //switch to next buffer - if (myRing->b <= 0){myRing->waiting = true; return;}//no next buffer? go in waiting mode. - myRing->b--; - currsend = 0; - }//completed a send - }//send - }; - int user::UserCount = 0; + Storage["totals"]["down"] = tot_down; + Storage["totals"]["up"] = tot_up; + Storage["totals"]["count"] = tot_count; + Storage["totals"]["now"] = now; + Storage["totals"]["buffer"] = name; + if (!StatsSocket.connected()){ + StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); + } + if (StatsSocket.connected()){ + StatsSocket.write(Storage.toString()+"\n\n"); + Storage["log"].null(); + } + stats_mutex.unlock(); + } + } void handleUser(void * v_usr){ user * usr = (user*)v_usr; @@ -180,16 +98,17 @@ namespace Buffer{ } while (usr->S.connected()){ + usleep(5000); //sleep 5ms if (usr->S.canRead()){ - std::string tmp = ""; + usr->inbuffer.clear(); char charbuf; while ((usr->S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){ - tmp += charbuf; + usr->inbuffer += charbuf; } - if (tmp != ""){ - if (tmp[0] == 'P'){ - std::cout << "Push attempt from IP " << tmp.substr(2) << std::endl; - if (tmp.substr(2) == waiting_ip){ + if (usr->inbuffer != ""){ + if (usr->inbuffer[0] == 'P'){ + std::cout << "Push attempt from IP " << usr->inbuffer.substr(2) << std::endl; + if (usr->inbuffer.substr(2) == waiting_ip){ if (!ip_input.connected()){ std::cout << "Push accepted!" << std::endl; ip_input = usr->S; @@ -202,25 +121,74 @@ namespace Buffer{ usr->Disconnect("Push denied - invalid IP address!"); } } - if (tmp[0] == 'S'){ - Stats tmpStats = Stats(tmp.substr(2)); - unsigned int secs = tmpStats.conntime - usr->lastStats.conntime; + if (usr->inbuffer[0] == 'S'){ + stats_mutex.lock(); + usr->tmpStats = Stats(usr->inbuffer.substr(2)); + unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; if (secs < 1){secs = 1;} - usr->curr_up = (tmpStats.up - usr->lastStats.up) / secs; - usr->curr_down = (tmpStats.down - usr->lastStats.down) / secs; - usr->lastStats = tmpStats; - Storage["curr"][usr->MyStr]["connector"] = tmpStats.connector; - Storage["curr"][usr->MyStr]["up"] = tmpStats.up; - Storage["curr"][usr->MyStr]["down"] = tmpStats.down; - Storage["curr"][usr->MyStr]["conntime"] = tmpStats.conntime; - Storage["curr"][usr->MyStr]["host"] = tmpStats.host; - Storage["curr"][usr->MyStr]["start"] = (unsigned int) time(0) - tmpStats.conntime; + usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; + usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; + usr->lastStats = usr->tmpStats; + Storage["curr"][usr->MyStr]["connector"] = usr->tmpStats.connector; + Storage["curr"][usr->MyStr]["up"] = usr->tmpStats.up; + Storage["curr"][usr->MyStr]["down"] = usr->tmpStats.down; + Storage["curr"][usr->MyStr]["conntime"] = usr->tmpStats.conntime; + Storage["curr"][usr->MyStr]["host"] = usr->tmpStats.host; + Storage["curr"][usr->MyStr]["start"] = (unsigned int) time(0) - usr->tmpStats.conntime; + stats_mutex.unlock(); } } } usr->Send(); } - usr->Disconnect("Closed"); + stats_mutex.lock(); + if (users.size() > 0){ + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + if (!(*usersIt).S.connected()){ + users.erase(usersIt); + break; + } + } + } + stats_mutex.unlock(); + std::cerr << "User " << usr->MyStr << " disconnected, socket number " << usr->S.getSocket() << std::endl; + } + + void handleStdin(void * empty){ + if (empty != 0){return;} + unsigned int lastPacketTime = 0;//time in MS last packet was parsed + unsigned int currPacketTime = 0;//time of the last parsed packet (current packet) + unsigned int prevPacketTime = 0;//time of the previously parsed packet (current packet - 1) + std::string inBuffer; + char charBuffer[1024*10]; + unsigned int charCount; + unsigned int now; + + while (std::cin.good() && buffer_running){ + //slow down packet receiving to real-time + now = getNowMS(); + if ((now - lastPacketTime > currPacketTime - prevPacketTime) || (currPacketTime <= prevPacketTime)){ + std::cin.read(charBuffer, 1024*10); + charCount = std::cin.gcount(); + inBuffer.append(charBuffer, charCount); + transfer_mutex.lock(); + if (Strm->parsePacket(inBuffer)){ + Strm->outPacket(0); + lastPacketTime = now; + prevPacketTime = currPacketTime; + currPacketTime = Strm->getTime(); + moreData.notify_all(); + } + transfer_mutex.unlock(); + }else{ + if (((currPacketTime - prevPacketTime) - (now - lastPacketTime)) > 1000){ + usleep(1000000); + }else{ + usleep(((currPacketTime - prevPacketTime) - (now - lastPacketTime)) * 1000); + } + } + } + buffer_running = false; } /// Starts a loop, waiting for connections to send data to. @@ -231,113 +199,66 @@ namespace Buffer{ sigemptyset (&new_action.sa_mask); new_action.sa_flags = 0; sigaction (SIGPIPE, &new_action, NULL); + sigaction (SIGKILL, &new_action, NULL); //then check and parse the commandline if (argc < 2) { std::cout << "usage: " << argv[0] << " streamName [awaiting_IP]" << std::endl; return 1; } + name = argv[1]; bool ip_waiting = false; if (argc >= 4){ waiting_ip += argv[2]; ip_waiting = true; } std::string shared_socket = "/tmp/shared_socket_"; - shared_socket += argv[1]; + shared_socket += name; - Socket::Server SS(shared_socket, true); + Socket::Server SS(shared_socket, false); Strm = new DTSC::Stream(5); - std::vector users; - std::vector::iterator usersIt; - std::string inBuffer; - char charBuffer[1024*10]; - unsigned int charCount; - unsigned int stattimer = 0; - unsigned int lastPacketTime = 0;//time in MS last packet was parsed - unsigned int currPacketTime = 0;//time of the last parsed packet (current packet) - unsigned int prevPacketTime = 0;//time of the previously parsed packet (current packet - 1) Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); - Socket::Connection StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); Storage["log"].null(); Storage["curr"].null(); Storage["totals"].null(); - while (!feof(stdin) || ip_waiting){ - usleep(1000); //sleep for 1 ms, to prevent 100% CPU time - unsigned int now = time(0); - if (now != stattimer){ - stattimer = now; - unsigned int tot_up = 0, tot_down = 0, tot_count = 0; - if (users.size() > 0){ - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - tot_down += usersIt->curr_down; - tot_up += usersIt->curr_up; - tot_count++; - } - } - Storage["totals"]["down"] = tot_down; - Storage["totals"]["up"] = tot_up; - Storage["totals"]["count"] = tot_count; - Storage["totals"]["now"] = now; - Storage["totals"]["buffer"] = argv[1]; - if (!StatsSocket.connected()){ - StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); - } - if (StatsSocket.connected()){ - StatsSocket.write(Storage.toString()+"\n\n"); - Storage["log"].null(); - } - } - //invalidate the current buffer - if ( (!ip_waiting && std_input.canRead()) || (ip_waiting && ip_input.connected()) ){ - //slow down packet receiving to real-time - if ((getNowMS() - lastPacketTime > currPacketTime - prevPacketTime) || (currPacketTime <= prevPacketTime)){ - std::cin.read(charBuffer, 1024*10); - charCount = std::cin.gcount(); - inBuffer.append(charBuffer, charCount); - if (Strm->parsePacket(inBuffer)){ - lastPacketTime = getNowMS(); - prevPacketTime = currPacketTime; - currPacketTime = Strm->getTime(); - } - } - } + //tthread::thread StatsThread = tthread::thread(handleStats, 0); + tthread::thread * StdinThread = 0; + if (!ip_waiting){ + StdinThread = new tthread::thread(handleStdin, 0); + } + while (buffer_running){ //check for new connections, accept them if there are any //starts a thread for every accepted connection incoming = SS.accept(false); if (incoming.connected()){ - std::cerr << "New socket: " << incoming.getSocket() << std::endl; + stats_mutex.lock(); users.push_back(incoming); user * usr_ptr = &(users.back()); + stats_mutex.unlock(); usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr); } - - //erase disconnected users - if (users.size() > 0){ - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - if (!(*usersIt).S.connected()){users.erase(usersIt); break;} - } - } - }//main loop // disconnect listener /// \todo Deal with EOF more nicely - doesn't send the end of the stream to all users! - std::cout << "Reached EOF of input" << std::endl; + buffer_running = false; + std::cout << "Buffer shutting down" << std::endl; SS.close(); + //StatsThread.join(); + if (StdinThread){StdinThread->join();} - while (users.size() > 0){ + if (users.size() > 0){ + stats_mutex.lock(); for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ if ((*usersIt).S.connected()){ (*usersIt).Disconnect("Terminating..."); - }else{ - users.erase(usersIt); - break; } } + stats_mutex.unlock(); } delete Strm; diff --git a/Buffer/stats.cpp b/Buffer/stats.cpp new file mode 100644 index 00000000..25d244d8 --- /dev/null +++ b/Buffer/stats.cpp @@ -0,0 +1,40 @@ + +namespace Buffer{ + /// Converts a stats line to up, down, host, connector and conntime values. + class Stats{ + public: + unsigned int up; + unsigned int down; + std::string host; + std::string connector; + unsigned int conntime; + Stats(){ + up = 0; + down = 0; + conntime = 0; + } + Stats(std::string s){ + size_t f = s.find(' '); + if (f != std::string::npos){ + host = s.substr(0, f); + s.erase(0, f+1); + } + f = s.find(' '); + if (f != std::string::npos){ + connector = s.substr(0, f); + s.erase(0, f+1); + } + f = s.find(' '); + if (f != std::string::npos){ + conntime = atoi(s.substr(0, f).c_str()); + s.erase(0, f+1); + } + f = s.find(' '); + if (f != std::string::npos){ + up = atoi(s.substr(0, f).c_str()); + s.erase(0, f+1); + down = atoi(s.c_str()); + } + } + }; +} diff --git a/Buffer/user.cpp b/Buffer/user.cpp new file mode 100644 index 00000000..3fe1f065 --- /dev/null +++ b/Buffer/user.cpp @@ -0,0 +1,97 @@ +namespace Buffer{ + /// Holds connected users. + /// Keeps track of what buffer users are using and the connection status. + class user{ + public: + tthread::thread * Thread; ///< Holds the thread dealing with this user. + DTSC::Ring * myRing; ///< Ring of the buffer for this user. + int MyNum; ///< User ID of this user. + std::string MyStr; ///< User ID of this user as a string. + std::string inbuffer; ///< Used to buffer input data. + int currsend; ///< Current amount of bytes sent. + Stats lastStats; ///< Holds last known stats for this connection. + Stats tmpStats; ///< Holds temporary stats for this connection. + unsigned int curr_up; ///< Holds the current estimated transfer speed up. + unsigned int curr_down; ///< Holds the current estimated transfer speed down. + bool gotproperaudio; ///< Whether the user received proper audio yet. + void * lastpointer; ///< Pointer to data part of current buffer. + static int UserCount; ///< Global user counter. + Socket::Connection S; ///< Connection to user + /// Creates a new user from a newly connected socket. + /// Also prints "User connected" text to stdout. + user(Socket::Connection fd){ + S = fd; + MyNum = UserCount++; + std::stringstream st; + st << MyNum; + MyStr = st.str(); + curr_up = 0; + curr_down = 0; + currsend = 0; + myRing = 0; + Thread = 0; + std::cout << "User " << MyNum << " connected" << std::endl; + }//constructor + /// Drops held DTSC::Ring class, if one is held. + ~user(){ + Strm->dropRing(myRing); + }//destructor + /// Disconnects the current user. Doesn't do anything if already disconnected. + /// Prints "Disconnected user" to stdout if disconnect took place. + void Disconnect(std::string reason) { + if (S.connected()){S.close();} + if (Thread != 0){ + if (Thread->joinable()){Thread->join();} + Thread = 0; + } + tthread::lock_guard lock(stats_mutex); + Storage["curr"].removeMember(MyStr); + Storage["log"][MyStr]["connector"] = lastStats.connector; + Storage["log"][MyStr]["up"] = lastStats.up; + Storage["log"][MyStr]["down"] = lastStats.down; + Storage["log"][MyStr]["conntime"] = lastStats.conntime; + Storage["log"][MyStr]["host"] = lastStats.host; + Storage["log"][MyStr]["start"] = (unsigned int)time(0) - lastStats.conntime; + std::cout << "Disconnected user " << MyStr << ": " << reason << ". " << lastStats.connector << " transferred " << lastStats.up << " up and " << lastStats.down << " down in " << lastStats.conntime << " seconds to " << lastStats.host << std::endl; + }//Disconnect + /// Tries to send the current buffer, returns true if success, false otherwise. + /// Has a side effect of dropping the connection if send will never complete. + bool doSend(const char * ptr, int len){ + int r = S.iwrite(ptr+currsend, len-currsend); + if (r <= 0){ + if (errno == EWOULDBLOCK){return false;} + Disconnect(S.getError()); + return false; + } + currsend += r; + return (currsend == len); + }//doSend + /// Try to send data to this user. Disconnects if any problems occur. + void Send(){ + if (!myRing){return;}//no ring! + if (!S.connected()){return;}//cancel if not connected + if (myRing->waiting){ + tthread::lock_guard guard(transfer_mutex); + moreData.wait(transfer_mutex); + return; + }//still waiting for next buffer? + + if (myRing->starved){ + //if corrupt data, warn and get new DTSC::Ring + std::cout << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl; + Strm->dropRing(myRing); + myRing = Strm->getRing(); + return; + } + + //try to complete a send + if (doSend(Strm->outPacket(myRing->b).c_str(), Strm->outPacket(myRing->b).length())){ + //switch to next buffer + currsend = 0; + if (myRing->b <= 0){myRing->waiting = true; return;}//no next buffer? go in waiting mode. + myRing->b--; + }//completed a send + }//send + }; + int user::UserCount = 0; +} diff --git a/Connector_HTTP/main.cpp b/Connector_HTTP/main.cpp index b1b6e7cc..0ba6ef71 100644 --- a/Connector_HTTP/main.cpp +++ b/Connector_HTTP/main.cpp @@ -304,28 +304,17 @@ namespace Connector_HTTP{ ss.write(stat); } } - ss.canRead(); - switch (ss.ready()){ - case -1: - conn.close(); - #if DEBUG >= 1 - fprintf(stderr, "Source socket is disconnected.\n"); - #endif - break; - case 0: break;//not ready yet - default: - if (ss.iread(recBuffer)){ - if (Strm.parsePacket(recBuffer)){ - tag.DTSCLoader(Strm); - if (handler == HANDLER_FLASH){ - FlashDynamic(tag, HTTP_S, conn, Strm); - } - if (handler == HANDLER_PROGRESSIVE){ - Progressive(tag, HTTP_S, conn, Strm); - } - } + if (ss.canRead()){ + ss.spool(); + if (Strm.parsePacket(ss.Received())){ + tag.DTSCLoader(Strm); + if (handler == HANDLER_FLASH){ + FlashDynamic(tag, HTTP_S, conn, Strm); } - break; + if (handler == HANDLER_PROGRESSIVE){ + Progressive(tag, HTTP_S, conn, Strm); + } + } } } } diff --git a/tools/DTSC2FLV/Makefile b/tools/DTSC2FLV/Makefile new file mode 100644 index 00000000..23d86bbf --- /dev/null +++ b/tools/DTSC2FLV/Makefile @@ -0,0 +1,23 @@ +SRC = main.cpp ../../util/flv_tag.cpp ../../util/dtsc.cpp ../../util/amf.cpp ../../util/socket.cpp +OBJ = $(SRC:.cpp=.o) +OUT = DDV_DTSC2FLV +INCLUDES = +DEBUG = 4 +OPTIMIZE = -g +CCFLAGS = -Wall -Wextra -funsigned-char $(OPTIMIZE) -DDEBUG=$(DEBUG) +CC = $(CROSS)g++ +LD = $(CROSS)ld +AR = $(CROSS)ar +LIBS = +.SUFFIXES: .cpp +.PHONY: clean default +default: $(OUT) +.cpp.o: + $(CC) $(INCLUDES) $(CCFLAGS) $(LIBS) -c $< -o $@ +$(OUT): $(OBJ) + $(CC) $(LIBS) -o $(OUT) $(OBJ) +clean: + rm -rf $(OBJ) $(OUT) Makefile.bak *~ +install: $(OUT) + cp -f ./$(OUT) /usr/bin/ + diff --git a/tools/DTSC2FLV/main.cpp b/tools/DTSC2FLV/main.cpp new file mode 100644 index 00000000..e2572abb --- /dev/null +++ b/tools/DTSC2FLV/main.cpp @@ -0,0 +1,64 @@ +/// \file DTSC2FLV/main.cpp +/// Contains the code that will transform any valid DTSC input into valid FLVs. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../../util/flv_tag.h" //FLV support +#include "../../util/dtsc.h" //DTSC support +#include "../../util/amf.h" //AMF support + +/// Holds all code that converts filetypes to DTSC. +namespace Converters{ + + /// Reads DTSC from STDIN, outputs FLV to STDOUT. + int DTSC2FLV() { + FLV::Tag FLV_out; // Temporary storage for outgoing FLV data. + DTSC::Stream Strm; + std::string inBuffer; + char charBuffer[1024*10]; + unsigned int charCount; + bool doneheader = false; + + while (std::cin.good()){ + std::cin.read(charBuffer, 1024*10); + charCount = std::cin.gcount(); + inBuffer.append(charBuffer, charCount); + if (Strm.parsePacket(inBuffer)){ + if (!doneheader){ + doneheader = true; + std::cout.write(FLV::Header, 13); + FLV_out.DTSCMetaInit(Strm); + std::cout.write(FLV_out.data, FLV_out.len); + if (Strm.metadata.getContentP("video") && Strm.metadata.getContentP("video")->getContentP("init")){ + FLV_out.DTSCVideoInit(Strm); + std::cout.write(FLV_out.data, FLV_out.len); + } + if (Strm.metadata.getContentP("audio") && Strm.metadata.getContentP("audio")->getContentP("init")){ + FLV_out.DTSCAudioInit(Strm); + std::cout.write(FLV_out.data, FLV_out.len); + } + } + if (FLV_out.DTSCLoader(Strm)){ + std::cout.write(FLV_out.data, FLV_out.len); + } + } + } + + std::cerr << "Done!" << std::endl; + + return 0; + }//FLV2DTSC + +};//Converter namespace + +/// Entry point for DTSC2FLV, simply calls Converters::DTSC2FLV(). +int main(){ + return Converters::DTSC2FLV(); +}//main diff --git a/tools/FLV2DTSC/main.cpp b/tools/FLV2DTSC/main.cpp index 3655432e..1d7156e0 100644 --- a/tools/FLV2DTSC/main.cpp +++ b/tools/FLV2DTSC/main.cpp @@ -217,6 +217,7 @@ namespace Converters{ case 0x40: pack_out.addContent(DTSC::DTMI("keyframe", 1)); break; case 0x50: continue; break;//the video info byte we just throw away - useless to us... } + pack_out.addContent(DTSC::DTMI("time", FLV_in.tagTime())); if ((videodata & 0x0F) == 7){ switch (FLV_in.data[12]){ case 1: pack_out.addContent(DTSC::DTMI("nalu", 1)); break; @@ -225,9 +226,10 @@ namespace Converters{ int offset = (FLV_in.data[13] << 16) + (FLV_in.data[14] << 8) + FLV_in.data[15]; offset = (offset << 8) >> 8; pack_out.addContent(DTSC::DTMI("offset", offset)); + pack_out.addContent(DTSC::DTMI("data", std::string((char*)FLV_in.data+16, (size_t)FLV_in.len-20))); + }else{ + pack_out.addContent(DTSC::DTMI("data", std::string((char*)FLV_in.data+12, (size_t)FLV_in.len-16))); } - pack_out.addContent(DTSC::DTMI("time", FLV_in.tagTime())); - pack_out.addContent(DTSC::DTMI("data", std::string((char*)FLV_in.data+12, (size_t)FLV_in.len-16))); if (sending){ std::cout << pack_out.Pack(true); }else{ diff --git a/tools/FLV_Analyser/Makefile b/tools/FLV_Analyser/Makefile new file mode 100644 index 00000000..81b2e2fa --- /dev/null +++ b/tools/FLV_Analyser/Makefile @@ -0,0 +1,23 @@ +SRC = main.cpp ../../util/flv_tag.cpp ../../util/dtsc.cpp ../../util/amf.cpp ../../util/socket.cpp +OBJ = $(SRC:.cpp=.o) +OUT = FLV_Info +INCLUDES = +DEBUG = 4 +OPTIMIZE = -g +CCFLAGS = -Wall -Wextra -funsigned-char $(OPTIMIZE) -DDEBUG=$(DEBUG) +CC = $(CROSS)g++ +LD = $(CROSS)ld +AR = $(CROSS)ar +LIBS = +.SUFFIXES: .cpp +.PHONY: clean default +default: $(OUT) +.cpp.o: + $(CC) $(INCLUDES) $(CCFLAGS) $(LIBS) -c $< -o $@ +$(OUT): $(OBJ) + $(CC) $(LIBS) -o $(OUT) $(OBJ) +clean: + rm -rf $(OBJ) $(OUT) Makefile.bak *~ +install: $(OUT) + cp -f ./$(OUT) /usr/bin/ + diff --git a/tools/FLV_Analyser/main.cpp b/tools/FLV_Analyser/main.cpp new file mode 100644 index 00000000..1d4aef0e --- /dev/null +++ b/tools/FLV_Analyser/main.cpp @@ -0,0 +1,52 @@ +/// \file DTSC_Analyser/main.cpp +/// Contains the code for the DTSC Analysing tool. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../../util/flv_tag.h" //FLV support + +/// Reads DTSC from stdin and outputs human-readable information to stderr. +int main() { + + FLV::Tag FLV_in; // Temporary storage for incoming FLV data. + + + while (!feof(stdin)){ + if (FLV_in.FileLoader(stdin)){ + std::cout << "Tag: " << FLV_in.tagType() << std::endl; + printf("%hhX %hhX %hhX %hhX %hhX %hhX %hhX %hhX %hhX %hhX\n", FLV_in.data[11], FLV_in.data[12], FLV_in.data[13], FLV_in.data[14], FLV_in.data[15], FLV_in.data[16], FLV_in.data[17], FLV_in.data[18], FLV_in.data[19], FLV_in.data[20]); + printf("%hhX %hhX %hhX %hhX %hhX %hhX %hhX %hhX %hhX %hhX\n", FLV_in.data[FLV_in.len-10], FLV_in.data[FLV_in.len-9], FLV_in.data[FLV_in.len-8], FLV_in.data[FLV_in.len-7], FLV_in.data[FLV_in.len-6], FLV_in.data[FLV_in.len-5], FLV_in.data[FLV_in.len-4], FLV_in.data[FLV_in.len-3], FLV_in.data[FLV_in.len-2], FLV_in.data[FLV_in.len-1]); + std::cout << std::endl; + } + } + + + DTSC::Stream Strm; + + std::string inBuffer; + char charBuffer[1024*10]; + unsigned int charCount; + bool doneheader = false; + + while(std::cin.good()){ + //invalidate the current buffer + std::cin.read(charBuffer, 1024*10); + charCount = std::cin.gcount(); + inBuffer.append(charBuffer, charCount); + if (Strm.parsePacket(inBuffer)){ + if (!doneheader){ + doneheader = true; + Strm.metadata.Print(); + } + Strm.getPacket().Print(); + } + } + return 0; +} diff --git a/util/amf.cpp b/util/amf.cpp index b5f51fad..a11788fb 100644 --- a/util/amf.cpp +++ b/util/amf.cpp @@ -2,6 +2,7 @@ /// Holds all code for the AMF namespace. #include "amf.h" +#include #include //needed for stderr only /// Returns the std::string Indice for the current object, if available. @@ -105,43 +106,45 @@ AMF::Object::Object(std::string indice, AMF::obj0type setType){//object type ini /// Prints the contents of this object to std::cerr. /// If this object contains other objects, it will call itself recursively /// and print all nested content in a nice human-readable format. -void AMF::Object::Print(std::string indent){ - std::cerr << indent; +std::string AMF::Object::Print(std::string indent){ + std::stringstream st; + st << indent; // print my type switch (myType){ - case AMF::AMF0_NUMBER: std::cerr << "Number"; break; - case AMF::AMF0_BOOL: std::cerr << "Bool"; break; + case AMF::AMF0_NUMBER: st << "Number"; break; + case AMF::AMF0_BOOL: st << "Bool"; break; case AMF::AMF0_STRING://short string - case AMF::AMF0_LONGSTRING: std::cerr << "String"; break; - case AMF::AMF0_OBJECT: std::cerr << "Object"; break; - case AMF::AMF0_MOVIECLIP: std::cerr << "MovieClip"; break; - case AMF::AMF0_NULL: std::cerr << "Null"; break; - case AMF::AMF0_UNDEFINED: std::cerr << "Undefined"; break; - case AMF::AMF0_REFERENCE: std::cerr << "Reference"; break; - case AMF::AMF0_ECMA_ARRAY: std::cerr << "ECMA Array"; break; - case AMF::AMF0_OBJ_END: std::cerr << "Object end"; break; - case AMF::AMF0_STRICT_ARRAY: std::cerr << "Strict Array"; break; - case AMF::AMF0_DATE: std::cerr << "Date"; break; - case AMF::AMF0_UNSUPPORTED: std::cerr << "Unsupported"; break; - case AMF::AMF0_RECORDSET: std::cerr << "Recordset"; break; - case AMF::AMF0_XMLDOC: std::cerr << "XML Document"; break; - case AMF::AMF0_TYPED_OBJ: std::cerr << "Typed Object"; break; - case AMF::AMF0_UPGRADE: std::cerr << "Upgrade to AMF3"; break; - case AMF::AMF0_DDV_CONTAINER: std::cerr << "DDVTech Container"; break; + case AMF::AMF0_LONGSTRING: st << "String"; break; + case AMF::AMF0_OBJECT: st << "Object"; break; + case AMF::AMF0_MOVIECLIP: st << "MovieClip"; break; + case AMF::AMF0_NULL: st << "Null"; break; + case AMF::AMF0_UNDEFINED: st << "Undefined"; break; + case AMF::AMF0_REFERENCE: st << "Reference"; break; + case AMF::AMF0_ECMA_ARRAY: st << "ECMA Array"; break; + case AMF::AMF0_OBJ_END: st << "Object end"; break; + case AMF::AMF0_STRICT_ARRAY: st << "Strict Array"; break; + case AMF::AMF0_DATE: st << "Date"; break; + case AMF::AMF0_UNSUPPORTED: st << "Unsupported"; break; + case AMF::AMF0_RECORDSET: st << "Recordset"; break; + case AMF::AMF0_XMLDOC: st << "XML Document"; break; + case AMF::AMF0_TYPED_OBJ: st << "Typed Object"; break; + case AMF::AMF0_UPGRADE: st << "Upgrade to AMF3"; break; + case AMF::AMF0_DDV_CONTAINER: st << "DDVTech Container"; break; } // print my string indice, if available - std::cerr << " " << myIndice << " "; + st << " " << myIndice << " "; // print my numeric or string contents switch (myType){ - case AMF::AMF0_NUMBER: case AMF::AMF0_BOOL: case AMF::AMF0_REFERENCE: case AMF::AMF0_DATE: std::cerr << numval; break; - case AMF::AMF0_STRING: case AMF::AMF0_LONGSTRING: case AMF::AMF0_XMLDOC: case AMF::AMF0_TYPED_OBJ: std::cerr << strval; break; + case AMF::AMF0_NUMBER: case AMF::AMF0_BOOL: case AMF::AMF0_REFERENCE: case AMF::AMF0_DATE: st << numval; break; + case AMF::AMF0_STRING: case AMF::AMF0_LONGSTRING: case AMF::AMF0_XMLDOC: case AMF::AMF0_TYPED_OBJ: st << strval; break; default: break;//we don't care about the rest, and don't want a compiler warning... } - std::cerr << std::endl; + st << std::endl; // if I hold other objects, print those too, recursively. if (contents.size() > 0){ - for (std::vector::iterator it = contents.begin(); it != contents.end(); it++){it->Print(indent+" ");} + for (std::vector::iterator it = contents.begin(); it != contents.end(); it++){st << it->Print(indent+" ");} } + return st.str(); };//print /// Packs the AMF object to a std::string for transfer over the network. diff --git a/util/amf.h b/util/amf.h index 61f95a8a..836cfdb2 100644 --- a/util/amf.h +++ b/util/amf.h @@ -70,7 +70,7 @@ namespace AMF{ Object(std::string indice, double val, obj0type setType = AMF0_NUMBER); Object(std::string indice, std::string val, obj0type setType = AMF0_STRING); Object(std::string indice, obj0type setType = AMF0_OBJECT); - void Print(std::string indent = ""); + std::string Print(std::string indent = ""); std::string Pack(); protected: std::string myIndice; ///< Holds this objects indice, if any. diff --git a/util/dtsc.h b/util/dtsc.h index f721e6c4..138e3ab4 100644 --- a/util/dtsc.h +++ b/util/dtsc.h @@ -106,9 +106,9 @@ namespace DTSC{ class Ring { public: Ring(unsigned int v); - unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly! - bool waiting; ///< If true, this Ring is currently waiting for a buffer fill. - bool starved; ///< If true, this Ring can no longer receive valid data. + volatile unsigned int b; ///< Holds current number of buffer. May and is intended to change unexpectedly! + volatile bool waiting; ///< If true, this Ring is currently waiting for a buffer fill. + volatile bool starved; ///< If true, this Ring can no longer receive valid data. }; /// Holds temporary data for a DTSC stream and provides functions to utilize it. diff --git a/util/flv_tag.cpp b/util/flv_tag.cpp index cf75e549..bba87754 100644 --- a/util/flv_tag.cpp +++ b/util/flv_tag.cpp @@ -9,6 +9,7 @@ #include //for Tag::FileLoader #include //malloc #include //memcpy +#include /// Holds the last FLV header parsed. /// Defaults to a audio+video header on FLV version 0x01 if no header received yet. @@ -100,80 +101,84 @@ bool FLV::Tag::isInitData(){ /// audio, video or metadata, what encoding is used, and the details /// of the encoding itself. std::string FLV::Tag::tagType(){ - std::string R = ""; + std::stringstream R; + R << len << " bytes of "; switch (data[0]){ case 0x09: switch (data[11] & 0x0F){ - case 1: R += "JPEG"; break; - case 2: R += "H263"; break; - case 3: R += "ScreenVideo1"; break; - case 4: R += "VP6"; break; - case 5: R += "VP6Alpha"; break; - case 6: R += "ScreenVideo2"; break; - case 7: R += "H264"; break; - default: R += "unknown"; break; + case 1: R << "JPEG"; break; + case 2: R << "H263"; break; + case 3: R << "ScreenVideo1"; break; + case 4: R << "VP6"; break; + case 5: R << "VP6Alpha"; break; + case 6: R << "ScreenVideo2"; break; + case 7: R << "H264"; break; + default: R << "unknown"; break; } - R += " video "; + R << " video "; switch (data[11] & 0xF0){ - case 0x10: R += "keyframe"; break; - case 0x20: R += "iframe"; break; - case 0x30: R += "disposableiframe"; break; - case 0x40: R += "generatedkeyframe"; break; - case 0x50: R += "videoinfo"; break; + case 0x10: R << "keyframe"; break; + case 0x20: R << "iframe"; break; + case 0x30: R << "disposableiframe"; break; + case 0x40: R << "generatedkeyframe"; break; + case 0x50: R << "videoinfo"; break; } if ((data[11] & 0x0F) == 7){ switch (data[12]){ - case 0: R += " header"; break; - case 1: R += " NALU"; break; - case 2: R += " endofsequence"; break; + case 0: R << " header"; break; + case 1: R << " NALU"; break; + case 2: R << " endofsequence"; break; } } break; case 0x08: switch (data[11] & 0xF0){ - case 0x00: R += "linear PCM PE"; break; - case 0x10: R += "ADPCM"; break; - case 0x20: R += "MP3"; break; - case 0x30: R += "linear PCM LE"; break; - case 0x40: R += "Nelly16kHz"; break; - case 0x50: R += "Nelly8kHz"; break; - case 0x60: R += "Nelly"; break; - case 0x70: R += "G711A-law"; break; - case 0x80: R += "G711mu-law"; break; - case 0x90: R += "reserved"; break; - case 0xA0: R += "AAC"; break; - case 0xB0: R += "Speex"; break; - case 0xE0: R += "MP38kHz"; break; - case 0xF0: R += "DeviceSpecific"; break; - default: R += "unknown"; break; + case 0x00: R << "linear PCM PE"; break; + case 0x10: R << "ADPCM"; break; + case 0x20: R << "MP3"; break; + case 0x30: R << "linear PCM LE"; break; + case 0x40: R << "Nelly16kHz"; break; + case 0x50: R << "Nelly8kHz"; break; + case 0x60: R << "Nelly"; break; + case 0x70: R << "G711A-law"; break; + case 0x80: R << "G711mu-law"; break; + case 0x90: R << "reserved"; break; + case 0xA0: R << "AAC"; break; + case 0xB0: R << "Speex"; break; + case 0xE0: R << "MP38kHz"; break; + case 0xF0: R << "DeviceSpecific"; break; + default: R << "unknown"; break; } switch (data[11] & 0x0C){ - case 0x0: R += " 5.5kHz"; break; - case 0x4: R += " 11kHz"; break; - case 0x8: R += " 22kHz"; break; - case 0xC: R += " 44kHz"; break; + case 0x0: R << " 5.5kHz"; break; + case 0x4: R << " 11kHz"; break; + case 0x8: R << " 22kHz"; break; + case 0xC: R << " 44kHz"; break; } switch (data[11] & 0x02){ - case 0: R += " 8bit"; break; - case 2: R += " 16bit"; break; + case 0: R << " 8bit"; break; + case 2: R << " 16bit"; break; } switch (data[11] & 0x01){ - case 0: R += " mono"; break; - case 1: R += " stereo"; break; + case 0: R << " mono"; break; + case 1: R << " stereo"; break; } - R += " audio"; + R << " audio"; if ((data[12] == 0) && ((data[11] & 0xF0) == 0xA0)){ - R += " initdata"; + R << " initdata"; } break; - case 0x12: - R += "(meta)data"; + case 0x12:{ + R << "(meta)data: "; + AMF::Object metadata = AMF::parse((unsigned char*)data+11, len-15); + R << metadata.Print(); break; + } default: - R += "unknown"; + R << "unknown"; break; } - return R; + return R.str(); }//FLV::Tag::tagtype /// Returns the 32-bit timestamp of this tag. @@ -297,7 +302,7 @@ bool FLV::Tag::DTSCLoader(DTSC::Stream & S){ if (S.getPacket().getContentP("interframe")){data[11] += 0x20;} if (S.getPacket().getContentP("disposableframe")){data[11] += 0x30;} break; - case DTSC::AUDIO: + case DTSC::AUDIO:{ if ((unsigned int)len == S.lastData().length() + 16){ memcpy(data+12, S.lastData().c_str(), S.lastData().length()); }else{ @@ -307,12 +312,18 @@ bool FLV::Tag::DTSCLoader(DTSC::Stream & S){ data[11] = 0; if (S.metadata.getContentP("audio")->getContentP("codec")->StrValue() == "AAC"){data[11] += 0xA0;} if (S.metadata.getContentP("audio")->getContentP("codec")->StrValue() == "MP3"){data[11] += 0x20;} - if (S.metadata.getContentP("audio")->getContentP("rate")->NumValue() == 11025){data[11] += 0x04;} - if (S.metadata.getContentP("audio")->getContentP("rate")->NumValue() == 22050){data[11] += 0x08;} - if (S.metadata.getContentP("audio")->getContentP("rate")->NumValue() == 44100){data[11] += 0x0C;} + unsigned int datarate = S.metadata.getContentP("audio")->getContentP("rate")->NumValue(); + if (datarate >= 44100){ + data[11] += 0x0C; + }else if(datarate >= 22050){ + data[11] += 0x08; + }else if(datarate >= 11025){ + data[11] += 0x04; + } if (S.metadata.getContentP("audio")->getContentP("size")->NumValue() == 16){data[11] += 0x02;} if (S.metadata.getContentP("audio")->getContentP("channels")->NumValue() > 1){data[11] += 0x01;} break; + } case DTSC::META: memcpy(data+11, S.lastData().c_str(), S.lastData().length()); break; @@ -329,6 +340,9 @@ bool FLV::Tag::DTSCLoader(DTSC::Stream & S){ data[1] = ((len-15) >> 16) & 0xFF; data[2] = ((len-15) >> 8) & 0xFF; data[3] = (len-15) & 0xFF; + data[8] = 0; + data[9] = 0; + data[10] = 0; tagTime(S.getPacket().getContentP("time")->NumValue()); return true; } @@ -336,7 +350,7 @@ bool FLV::Tag::DTSCLoader(DTSC::Stream & S){ /// Helper function that properly sets the tag length from the internal len variable. void FLV::Tag::setLen(){ int len4 = len - 4; - int i = len-1; + int i = len; data[--i] = (len4) & 0xFF; len4 >>= 8; data[--i] = (len4) & 0xFF; @@ -375,6 +389,9 @@ bool FLV::Tag::DTSCVideoInit(DTSC::Stream & S){ data[1] = ((len-15) >> 16) & 0xFF; data[2] = ((len-15) >> 8) & 0xFF; data[3] = (len-15) & 0xFF; + data[8] = 0; + data[9] = 0; + data[10] = 0; tagTime(0); return true; } @@ -402,23 +419,25 @@ bool FLV::Tag::DTSCAudioInit(DTSC::Stream & S){ data[11] = 0; if (S.metadata.getContentP("audio")->getContentP("codec")->StrValue() == "AAC"){data[11] += 0xA0;} if (S.metadata.getContentP("audio")->getContentP("codec")->StrValue() == "MP3"){data[11] += 0x20;} - if (S.metadata.getContentP("audio")->getContentP("rate")->NumValue() == 11000){data[11] += 0x04;} - if (S.metadata.getContentP("audio")->getContentP("rate")->NumValue() == 22000){data[11] += 0x08;} - if (S.metadata.getContentP("audio")->getContentP("rate")->NumValue() == 44000){data[11] += 0x0C;} + unsigned int datarate = S.metadata.getContentP("audio")->getContentP("rate")->NumValue(); + if (datarate >= 44100){ + data[11] += 0x0C; + }else if(datarate >= 22050){ + data[11] += 0x08; + }else if(datarate >= 11025){ + data[11] += 0x04; + } if (S.metadata.getContentP("audio")->getContentP("size")->NumValue() == 16){data[11] += 0x02;} if (S.metadata.getContentP("audio")->getContentP("channels")->NumValue() > 1){data[11] += 0x01;} } setLen(); - switch (S.lastType()){ - case DTSC::VIDEO: data[0] = 0x09; break; - case DTSC::AUDIO: data[0] = 0x08; break; - case DTSC::META: data[0] = 0x12; break; - default: break; - } data[0] = 0x08; data[1] = ((len-15) >> 16) & 0xFF; data[2] = ((len-15) >> 8) & 0xFF; data[3] = (len-15) & 0xFF; + data[8] = 0; + data[9] = 0; + data[10] = 0; tagTime(0); return true; } @@ -501,6 +520,9 @@ bool FLV::Tag::DTSCMetaInit(DTSC::Stream & S){ data[1] = ((len-15) >> 16) & 0xFF; data[2] = ((len-15) >> 8) & 0xFF; data[3] = (len-15) & 0xFF; + data[8] = 0; + data[9] = 0; + data[10] = 0; tagTime(0); return true; }