diff --git a/Buffer/Makefile b/Buffer/Makefile index 89e4f902..58ee5b2f 100644 --- a/Buffer/Makefile +++ b/Buffer/Makefile @@ -1,6 +1,6 @@ -SRC = main.cpp ../util/json.cpp ../util/socket.cpp ../util/dtsc.cpp ../util/tinythread.cpp +SRC = main.cpp ../util/json.cpp ../util/socket.cpp ../util/dtsc.cpp ../util/tinythread.cpp user.cpp stats.cpp stream.cpp OBJ = $(SRC:.cpp=.o) -OUT = DDV_Buffer +OUT = MistBuffer INCLUDES = DEBUG = 4 OPTIMIZE = -g @@ -15,9 +15,9 @@ default: $(OUT) .cpp.o: $(CC) $(INCLUDES) $(CCFLAGS) $(LIBS) -c $< -o $@ $(OUT): $(OBJ) - $(CC) $(LIBS) -o $(OUT) $(OBJ) + $(CC) $(LIBS) -o ../bin/$(OUT) $(OBJ) clean: - rm -rf $(OBJ) $(OUT) Makefile.bak *~ + rm -rf $(OBJ) ../bin/$(OUT) Makefile.bak *~ install: $(OUT) - cp -f ./$(OUT) /usr/bin/ + cp -f ../bin/$(OUT) /usr/bin/ diff --git a/Buffer/main.cpp b/Buffer/main.cpp index a458cbd1..eb60ec18 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -12,27 +12,14 @@ #include #include #include -#include "../util/dtsc.h" //DTSC support -#include "../util/socket.h" //Socket lib -#include "../util/json.h" -#include "../util/tinythread.h" +#include "stream.h" /// Holds all code unique to the Buffer. namespace Buffer{ - class user;//forward declaration - JSON::Value Storage; ///< Global storage of data. - DTSC::Stream * Strm = 0; - std::string waiting_ip = ""; ///< IP address for media push. - Socket::Connection ip_input; ///< Connection used for media push. - tthread::mutex stats_mutex; ///< Mutex for stats modifications. - tthread::mutex transfer_mutex; ///< Mutex for data transfers. - tthread::mutex socket_mutex; ///< Mutex for user deletion/work. - bool buffer_running = true; ///< Set to false when shutting down. - std::vector users; ///< All connected users. - std::vector::iterator usersIt; ///< Iterator for all connected users. - std::string name; ///< Name for this buffer. - tthread::condition_variable moreData; ///< Triggered when more data becomes available. + volatile bool buffer_running = true; ///< Set to false when shutting down. + Stream * thisStream = 0; + Socket::Server SS; ///< The server socket. /// Gets the current system time in milliseconds. unsigned int getNowMS(){ @@ -50,40 +37,18 @@ namespace Buffer{ default: return; break; } } -} -#include "stats.cpp" -#include "user.cpp" - -namespace Buffer{ void handleStats(void * empty){ if (empty != 0){return;} - Socket::Connection StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); + Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); while (buffer_running){ usleep(1000000); //sleep one second - unsigned int now = time(0); - unsigned int tot_up = 0, tot_down = 0, tot_count = 0; - stats_mutex.lock(); - if (users.size() > 0){ - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - tot_down += usersIt->curr_down; - tot_up += usersIt->curr_up; - tot_count++; - } - } - Storage["totals"]["down"] = tot_down; - Storage["totals"]["up"] = tot_up; - Storage["totals"]["count"] = tot_count; - Storage["totals"]["now"] = now; - Storage["totals"]["buffer"] = name; if (!StatsSocket.connected()){ - StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); + StatsSocket = Socket::Connection("/tmp/mist/statistics", true); } if (StatsSocket.connected()){ - StatsSocket.write(Storage.toString()+"\n\n"); - Storage["log"].null(); + StatsSocket.write(Stream::get()->getStats()+"\n\n"); } - stats_mutex.unlock(); } } @@ -91,8 +56,8 @@ namespace Buffer{ user * usr = (user*)v_usr; std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl; - usr->myRing = Strm->getRing(); - if (!usr->S.write(Strm->outHeader())){ + usr->myRing = thisStream->getRing(); + if (!usr->S.write(thisStream->getHeader())){ usr->Disconnect("failed to receive the header!"); return; } @@ -108,10 +73,9 @@ namespace Buffer{ if (usr->inbuffer != ""){ if (usr->inbuffer[0] == 'P'){ std::cout << "Push attempt from IP " << usr->inbuffer.substr(2) << std::endl; - if (usr->inbuffer.substr(2) == waiting_ip){ - if (!ip_input.connected()){ + if (thisStream->checkWaitingIP(usr->inbuffer.substr(2))){ + if (thisStream->setInput(usr->S)){ std::cout << "Push accepted!" << std::endl; - ip_input = usr->S; usr->S = Socket::Connection(-1); return; }else{ @@ -122,38 +86,23 @@ namespace Buffer{ } } if (usr->inbuffer[0] == 'S'){ - stats_mutex.lock(); usr->tmpStats = Stats(usr->inbuffer.substr(2)); unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; if (secs < 1){secs = 1;} usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; usr->lastStats = usr->tmpStats; - Storage["curr"][usr->MyStr]["connector"] = usr->tmpStats.connector; - Storage["curr"][usr->MyStr]["up"] = usr->tmpStats.up; - Storage["curr"][usr->MyStr]["down"] = usr->tmpStats.down; - Storage["curr"][usr->MyStr]["conntime"] = usr->tmpStats.conntime; - Storage["curr"][usr->MyStr]["host"] = usr->tmpStats.host; - Storage["curr"][usr->MyStr]["start"] = (unsigned int) time(0) - usr->tmpStats.conntime; - stats_mutex.unlock(); + thisStream->saveStats(usr->MyStr, usr->tmpStats); } } } usr->Send(); } - stats_mutex.lock(); - if (users.size() > 0){ - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - if (!(*usersIt).S.connected()){ - users.erase(usersIt); - break; - } - } - } - stats_mutex.unlock(); + thisStream->cleanUsers(); std::cerr << "User " << usr->MyStr << " disconnected, socket number " << usr->S.getSocket() << std::endl; } + /// Loop reading DTSC data from stdin and processing it at the correct speed. void handleStdin(void * empty){ if (empty != 0){return;} unsigned int lastPacketTime = 0;//time in MS last packet was parsed @@ -167,28 +116,49 @@ namespace Buffer{ while (std::cin.good() && buffer_running){ //slow down packet receiving to real-time now = getNowMS(); - if ((now - lastPacketTime > currPacketTime - prevPacketTime) || (currPacketTime <= prevPacketTime)){ + if ((now - lastPacketTime >= currPacketTime - prevPacketTime) || (currPacketTime <= prevPacketTime)){ std::cin.read(charBuffer, 1024*10); charCount = std::cin.gcount(); inBuffer.append(charBuffer, charCount); - transfer_mutex.lock(); - if (Strm->parsePacket(inBuffer)){ - Strm->outPacket(0); + thisStream->getWriteLock(); + if (thisStream->getStream()->parsePacket(inBuffer)){ + thisStream->getStream()->outPacket(0); lastPacketTime = now; prevPacketTime = currPacketTime; - currPacketTime = Strm->getTime(); - moreData.notify_all(); + currPacketTime = thisStream->getStream()->getTime(); } - transfer_mutex.unlock(); + thisStream->dropWriteLock(); }else{ - if (((currPacketTime - prevPacketTime) - (now - lastPacketTime)) > 1000){ - usleep(1000000); + if (((currPacketTime - prevPacketTime) - (now - lastPacketTime)) > 999){ + usleep(999000); }else{ - usleep(((currPacketTime - prevPacketTime) - (now - lastPacketTime)) * 1000); + usleep(((currPacketTime - prevPacketTime) - (now - lastPacketTime)) * 999); } } } buffer_running = false; + SS.close(); + } + + /// Loop reading DTSC data from an IP push address. + /// No changes to the speed are made. + void handlePushin(void * empty){ + if (empty != 0){return;} + std::string inBuffer; + while (buffer_running){ + if (thisStream->getIPInput().connected()){ + if (thisStream->getIPInput().iread(inBuffer)){ + thisStream->getWriteLock(); + if (thisStream->getStream()->parsePacket(inBuffer)){ + thisStream->getStream()->outPacket(0); + } + thisStream->dropWriteLock(); + } + }else{ + usleep(1000000); + } + } + SS.close(); } /// Starts a loop, waiting for connections to send data to. @@ -206,62 +176,49 @@ namespace Buffer{ std::cout << "usage: " << argv[0] << " streamName [awaiting_IP]" << std::endl; return 1; } - name = argv[1]; + std::string name = argv[1]; bool ip_waiting = false; + std::string waiting_ip; if (argc >= 4){ waiting_ip += argv[2]; ip_waiting = true; } - std::string shared_socket = "/tmp/shared_socket_"; - shared_socket += name; - Socket::Server SS(shared_socket, false); - Strm = new DTSC::Stream(5); + SS = Socket::makeStream(name); + thisStream = Stream::get(); + thisStream->setName(name); + if (ip_waiting){ + thisStream->setWaitingIP(waiting_ip); + } Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); - Storage["log"].null(); - Storage["curr"].null(); - Storage["totals"].null(); - - //tthread::thread StatsThread = tthread::thread(handleStats, 0); + tthread::thread StatsThread = tthread::thread(handleStats, 0); tthread::thread * StdinThread = 0; if (!ip_waiting){ StdinThread = new tthread::thread(handleStdin, 0); + }else{ + StdinThread = new tthread::thread(handlePushin, 0); } - while (buffer_running){ + while (buffer_running && SS.connected()){ //check for new connections, accept them if there are any //starts a thread for every accepted connection incoming = SS.accept(false); if (incoming.connected()){ - stats_mutex.lock(); - users.push_back(incoming); - user * usr_ptr = &(users.back()); - stats_mutex.unlock(); + user * usr_ptr = new user(incoming); + thisStream->addUser(usr_ptr); usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr); } }//main loop // disconnect listener - /// \todo Deal with EOF more nicely - doesn't send the end of the stream to all users! buffer_running = false; - std::cout << "Buffer shutting down" << std::endl; + std::cout << "End of input file - buffer shutting down" << std::endl; SS.close(); - //StatsThread.join(); - if (StdinThread){StdinThread->join();} - - if (users.size() > 0){ - stats_mutex.lock(); - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - if ((*usersIt).S.connected()){ - (*usersIt).Disconnect("Terminating..."); - } - } - stats_mutex.unlock(); - } - - delete Strm; + StatsThread.join(); + StdinThread->join(); + delete thisStream; return 0; } diff --git a/Buffer/stats.cpp b/Buffer/stats.cpp index 25d244d8..db93db65 100644 --- a/Buffer/stats.cpp +++ b/Buffer/stats.cpp @@ -1,40 +1,32 @@ +#include "stats.h" +#include //for atoi() -namespace Buffer{ - /// Converts a stats line to up, down, host, connector and conntime values. - class Stats{ - public: - unsigned int up; - unsigned int down; - std::string host; - std::string connector; - unsigned int conntime; - Stats(){ - up = 0; - down = 0; - conntime = 0; - } - Stats(std::string s){ - size_t f = s.find(' '); - if (f != std::string::npos){ - host = s.substr(0, f); - s.erase(0, f+1); - } - f = s.find(' '); - if (f != std::string::npos){ - connector = s.substr(0, f); - s.erase(0, f+1); - } - f = s.find(' '); - if (f != std::string::npos){ - conntime = atoi(s.substr(0, f).c_str()); - s.erase(0, f+1); - } - f = s.find(' '); - if (f != std::string::npos){ - up = atoi(s.substr(0, f).c_str()); - s.erase(0, f+1); - down = atoi(s.c_str()); - } - } - }; +Buffer::Stats::Stats(){ + up = 0; + down = 0; + conntime = 0; +} + +Buffer::Stats::Stats(std::string s){ + size_t f = s.find(' '); + if (f != std::string::npos){ + host = s.substr(0, f); + s.erase(0, f+1); + } + f = s.find(' '); + if (f != std::string::npos){ + connector = s.substr(0, f); + s.erase(0, f+1); + } + f = s.find(' '); + if (f != std::string::npos){ + conntime = atoi(s.substr(0, f).c_str()); + s.erase(0, f+1); + } + f = s.find(' '); + if (f != std::string::npos){ + up = atoi(s.substr(0, f).c_str()); + s.erase(0, f+1); + down = atoi(s.c_str()); + } } diff --git a/Buffer/stats.h b/Buffer/stats.h new file mode 100644 index 00000000..38a31c29 --- /dev/null +++ b/Buffer/stats.h @@ -0,0 +1,16 @@ +#pragma once +#include + +namespace Buffer{ + /// Converts a stats line to up, down, host, connector and conntime values. + class Stats{ + public: + unsigned int up; + unsigned int down; + std::string host; + std::string connector; + unsigned int conntime; + Stats(); + Stats(std::string s); + }; +} diff --git a/Buffer/stream.cpp b/Buffer/stream.cpp new file mode 100644 index 00000000..7f1a282a --- /dev/null +++ b/Buffer/stream.cpp @@ -0,0 +1,216 @@ +#include "stream.h" + +/// Stores the globally equal reference. +Buffer::Stream * Buffer::Stream::ref = 0; + +/// Returns a globally equal reference to this class. +Buffer::Stream * Buffer::Stream::get(){ + static tthread::mutex creator; + if (ref == 0){ + //prevent creating two at the same time + creator.lock(); + if (ref == 0){ref = new Stream();} + creator.unlock(); + } + return ref; +} + +/// Creates a new DTSC::Stream object, private function so only one instance can exist. +Buffer::Stream::Stream(){ + Strm = new DTSC::Stream(5); +} + +/// Do cleanup on delete. +Buffer::Stream::~Stream(){ + delete Strm; + while (users.size() > 0){ + stats_mutex.lock(); + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + if ((**usersIt).S.connected()){ + if ((**usersIt).myRing->waiting){ + (**usersIt).S.close(); + printf("Closing user %s\n", (**usersIt).MyStr.c_str()); + } + } + } + stats_mutex.unlock(); + moreData.notify_all(); + cleanUsers(); + } +} + +/// Calculate and return the current statistics in JSON format. +std::string Buffer::Stream::getStats(){ + unsigned int now = time(0); + unsigned int tot_up = 0, tot_down = 0, tot_count = 0; + stats_mutex.lock(); + if (users.size() > 0){ + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + tot_down += (**usersIt).curr_down; + tot_up += (**usersIt).curr_up; + tot_count++; + } + } + Storage["totals"]["down"] = tot_down; + Storage["totals"]["up"] = tot_up; + Storage["totals"]["count"] = tot_count; + Storage["totals"]["now"] = now; + Storage["totals"]["buffer"] = name; + std::string ret = Storage.toString(); + Storage["log"].null(); + stats_mutex.unlock(); + return ret; +} + +/// Get a new DTSC::Ring object for a user. +DTSC::Ring * Buffer::Stream::getRing(){ + return Strm->getRing(); +} + +/// Drop a DTSC::Ring object. +void Buffer::Stream::dropRing(DTSC::Ring * ring){ + Strm->dropRing(ring); +} + +/// Get the (constant) header data of this stream. +std::string & Buffer::Stream::getHeader(){ + return Strm->outHeader(); +} + +/// Set the IP address to accept push data from. +void Buffer::Stream::setWaitingIP(std::string ip){ + waiting_ip = ip; +} + +/// Check if this is the IP address to accept push data from. +bool Buffer::Stream::checkWaitingIP(std::string ip){ + if (ip == waiting_ip || ip == "::ffff:"+waiting_ip){ + return true; + }else{ + return false; + } +} + +/// Sets the current socket for push data. +bool Buffer::Stream::setInput(Socket::Connection S){ + if (ip_input.connected()){ + return false; + }else{ + ip_input = S; + return true; + } +} + +/// Gets the current socket for push data. +Socket::Connection & Buffer::Stream::getIPInput(){ + return ip_input; +} + + +/// Stores intermediate statistics. +void Buffer::Stream::saveStats(std::string username, Stats & stats){ + stats_mutex.lock(); + Storage["curr"][username]["connector"] = stats.connector; + Storage["curr"][username]["up"] = stats.up; + Storage["curr"][username]["down"] = stats.down; + Storage["curr"][username]["conntime"] = stats.conntime; + Storage["curr"][username]["host"] = stats.host; + Storage["curr"][username]["start"] = (unsigned int) time(0) - stats.conntime; + stats_mutex.unlock(); +} + +/// Stores final statistics. +void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string reason){ + stats_mutex.lock(); + Storage["curr"].removeMember(username); + Storage["log"][username]["connector"] = stats.connector; + Storage["log"][username]["up"] = stats.up; + Storage["log"][username]["down"] = stats.down; + Storage["log"][username]["conntime"] = stats.conntime; + Storage["log"][username]["host"] = stats.host; + Storage["log"][username]["start"] = (unsigned int)time(0) - stats.conntime; + std::cout << "Disconnected user " << username << ": " << reason << ". " << stats.connector << " transferred " << stats.up << " up and " << stats.down << " down in " << stats.conntime << " seconds to " << stats.host << std::endl; + stats_mutex.unlock(); + cleanUsers(); +} + +/// Cleans up broken connections +void Buffer::Stream::cleanUsers(){ + bool repeat = false; + stats_mutex.lock(); + do{ + repeat = false; + if (users.size() > 0){ + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + if ((**usersIt).Thread == 0 && !(**usersIt).S.connected()){ + delete *usersIt; + users.erase(usersIt); + repeat = true; + break; + } + } + } + }while(repeat); + stats_mutex.unlock(); +} + +/// Blocks until writing is safe. +void Buffer::Stream::getWriteLock(){ + rw_mutex.lock(); + writers++; + while (writers != 1 && readers != 0){ + rw_change.wait(rw_mutex); + } + rw_mutex.unlock(); +} + +/// Drops a previously gotten write lock. +void Buffer::Stream::dropWriteLock(){ + rw_mutex.lock(); + writers--; + rw_mutex.unlock(); + rw_change.notify_all(); + moreData.notify_all(); +} + +/// Blocks until reading is safe. +void Buffer::Stream::getReadLock(){ + rw_mutex.lock(); + while (writers > 0){ + rw_change.wait(rw_mutex); + } + readers++; + rw_mutex.unlock(); +} + +/// Drops a previously gotten read lock. +void Buffer::Stream::dropReadLock(){ + rw_mutex.lock(); + readers--; + rw_mutex.unlock(); + rw_change.notify_all(); +} + +/// Retrieves a reference to the DTSC::Stream +DTSC::Stream * Buffer::Stream::getStream(){ + return Strm; +} + +/// Sets the buffer name. +void Buffer::Stream::setName(std::string n){ + name = n; +} + +/// Add a user to the userlist. +void Buffer::Stream::addUser(user * new_user){ + stats_mutex.lock(); + users.push_back(new_user); + stats_mutex.unlock(); +} + +/// Blocks the thread until new data is available. +void Buffer::Stream::waitForData(){ + stats_mutex.lock(); + moreData.wait(stats_mutex); + stats_mutex.unlock(); +} diff --git a/Buffer/stream.h b/Buffer/stream.h new file mode 100644 index 00000000..e8153db0 --- /dev/null +++ b/Buffer/stream.h @@ -0,0 +1,69 @@ +#pragma once +#include +#include "../util/tinythread.h" +#include "../util/json.h" +#include "user.h" + +namespace Buffer{ + class Stream{ + public: + /// Get a reference to this Stream object. + static Stream * get(); + /// Get the current statistics in JSON format. + std::string getStats(); + /// Get a new DTSC::Ring object for a user. + DTSC::Ring * getRing(); + /// Drop a DTSC::Ring object. + void dropRing(DTSC::Ring * ring); + /// Get the (constant) header data of this stream. + std::string & getHeader(); + /// Set the IP address to accept push data from. + void setWaitingIP(std::string ip); + /// Check if this is the IP address to accept push data from. + bool checkWaitingIP(std::string ip); + /// Sets the current socket for push data. + bool setInput(Socket::Connection S); + /// Gets the current socket for push data. + Socket::Connection & getIPInput(); + /// Stores intermediate statistics. + void saveStats(std::string username, Stats & stats); + /// Stores final statistics. + void clearStats(std::string username, Stats & stats, std::string reason); + /// Cleans up broken connections + void cleanUsers(); + /// Blocks until writing is safe. + void getWriteLock(); + /// Drops a previously gotten write lock. + void dropWriteLock(); + /// Blocks until reading is safe. + void getReadLock(); + /// Drops a previously gotten read lock. + void dropReadLock(); + /// Retrieves a reference to the DTSC::Stream + DTSC::Stream * getStream(); + /// Sets the buffer name. + void setName(std::string n); + /// Add a user to the userlist. + void addUser(user * new_user); + /// Blocks the thread until new data is available. + void waitForData(); + /// Cleanup function + ~Stream(); + private: + volatile int readers;///< Current count of active readers; + volatile int writers;///< Current count of waiting/active writers. + tthread::mutex rw_mutex; ///< Mutex for read/write locking. + tthread::condition_variable rw_change; ///< Triggered when reader/writer count changes. + static Stream * ref; + Stream(); + JSON::Value Storage; ///< Global storage of data. + DTSC::Stream * Strm; + std::string waiting_ip; ///< IP address for media push. + Socket::Connection ip_input; ///< Connection used for media push. + tthread::mutex stats_mutex; ///< Mutex for stats/users modifications. + std::vector users; ///< All connected users. + std::vector::iterator usersIt; ///< Iterator for all connected users. + std::string name; ///< Name for this buffer. + tthread::condition_variable moreData; ///< Triggered when more data becomes available. + }; +}; diff --git a/Buffer/user.cpp b/Buffer/user.cpp index 3fe1f065..d0b79898 100644 --- a/Buffer/user.cpp +++ b/Buffer/user.cpp @@ -1,97 +1,76 @@ -namespace Buffer{ - /// Holds connected users. - /// Keeps track of what buffer users are using and the connection status. - class user{ - public: - tthread::thread * Thread; ///< Holds the thread dealing with this user. - DTSC::Ring * myRing; ///< Ring of the buffer for this user. - int MyNum; ///< User ID of this user. - std::string MyStr; ///< User ID of this user as a string. - std::string inbuffer; ///< Used to buffer input data. - int currsend; ///< Current amount of bytes sent. - Stats lastStats; ///< Holds last known stats for this connection. - Stats tmpStats; ///< Holds temporary stats for this connection. - unsigned int curr_up; ///< Holds the current estimated transfer speed up. - unsigned int curr_down; ///< Holds the current estimated transfer speed down. - bool gotproperaudio; ///< Whether the user received proper audio yet. - void * lastpointer; ///< Pointer to data part of current buffer. - static int UserCount; ///< Global user counter. - Socket::Connection S; ///< Connection to user - /// Creates a new user from a newly connected socket. - /// Also prints "User connected" text to stdout. - user(Socket::Connection fd){ - S = fd; - MyNum = UserCount++; - std::stringstream st; - st << MyNum; - MyStr = st.str(); - curr_up = 0; - curr_down = 0; - currsend = 0; - myRing = 0; - Thread = 0; - std::cout << "User " << MyNum << " connected" << std::endl; - }//constructor - /// Drops held DTSC::Ring class, if one is held. - ~user(){ - Strm->dropRing(myRing); - }//destructor - /// Disconnects the current user. Doesn't do anything if already disconnected. - /// Prints "Disconnected user" to stdout if disconnect took place. - void Disconnect(std::string reason) { - if (S.connected()){S.close();} - if (Thread != 0){ - if (Thread->joinable()){Thread->join();} - Thread = 0; - } - tthread::lock_guard lock(stats_mutex); - Storage["curr"].removeMember(MyStr); - Storage["log"][MyStr]["connector"] = lastStats.connector; - Storage["log"][MyStr]["up"] = lastStats.up; - Storage["log"][MyStr]["down"] = lastStats.down; - Storage["log"][MyStr]["conntime"] = lastStats.conntime; - Storage["log"][MyStr]["host"] = lastStats.host; - Storage["log"][MyStr]["start"] = (unsigned int)time(0) - lastStats.conntime; - std::cout << "Disconnected user " << MyStr << ": " << reason << ". " << lastStats.connector << " transferred " << lastStats.up << " up and " << lastStats.down << " down in " << lastStats.conntime << " seconds to " << lastStats.host << std::endl; - }//Disconnect - /// Tries to send the current buffer, returns true if success, false otherwise. - /// Has a side effect of dropping the connection if send will never complete. - bool doSend(const char * ptr, int len){ - int r = S.iwrite(ptr+currsend, len-currsend); - if (r <= 0){ - if (errno == EWOULDBLOCK){return false;} - Disconnect(S.getError()); - return false; - } - currsend += r; - return (currsend == len); - }//doSend - /// Try to send data to this user. Disconnects if any problems occur. - void Send(){ - if (!myRing){return;}//no ring! - if (!S.connected()){return;}//cancel if not connected - if (myRing->waiting){ - tthread::lock_guard guard(transfer_mutex); - moreData.wait(transfer_mutex); - return; - }//still waiting for next buffer? +#include "user.h" +#include "stream.h" +#include - if (myRing->starved){ - //if corrupt data, warn and get new DTSC::Ring - std::cout << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl; - Strm->dropRing(myRing); - myRing = Strm->getRing(); - return; - } +int Buffer::user::UserCount = 0; - //try to complete a send - if (doSend(Strm->outPacket(myRing->b).c_str(), Strm->outPacket(myRing->b).length())){ - //switch to next buffer - currsend = 0; - if (myRing->b <= 0){myRing->waiting = true; return;}//no next buffer? go in waiting mode. - myRing->b--; - }//completed a send - }//send - }; - int user::UserCount = 0; -} +/// Creates a new user from a newly connected socket. +/// Also prints "User connected" text to stdout. +Buffer::user::user(Socket::Connection fd){ + S = fd; + MyNum = UserCount++; + std::stringstream st; + st << MyNum; + MyStr = st.str(); + curr_up = 0; + curr_down = 0; + currsend = 0; + myRing = 0; + Thread = 0; + std::cout << "User " << MyNum << " connected" << std::endl; +}//constructor + +/// Drops held DTSC::Ring class, if one is held. +Buffer::user::~user(){ + Stream::get()->dropRing(myRing); +}//destructor + +/// Disconnects the current user. Doesn't do anything if already disconnected. +/// Prints "Disconnected user" to stdout if disconnect took place. +void Buffer::user::Disconnect(std::string reason) { + if (S.connected()){S.close();} + if (Thread != 0){ + if (Thread->joinable()){Thread->join();} + Thread = 0; + } + Stream::get()->clearStats(MyStr, lastStats, reason); +}//Disconnect + +/// Tries to send the current buffer, returns true if success, false otherwise. +/// Has a side effect of dropping the connection if send will never complete. +bool Buffer::user::doSend(const char * ptr, int len){ + int r = S.iwrite(ptr+currsend, len-currsend); + if (r <= 0){ + if (errno == EWOULDBLOCK){return false;} + Disconnect(S.getError()); + return false; + } + currsend += r; + return (currsend == len); +}//doSend + +/// Try to send data to this user. Disconnects if any problems occur. +void Buffer::user::Send(){ + if (!myRing){return;}//no ring! + if (!S.connected()){return;}//cancel if not connected + if (myRing->waiting){ + Stream::get()->waitForData(); + return; + }//still waiting for next buffer? + if (myRing->starved){ + //if corrupt data, warn and get new DTSC::Ring + std::cout << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl; + Stream::get()->dropRing(myRing); + myRing = Stream::get()->getRing(); + return; + } + //try to complete a send + Stream::get()->getReadLock(); + if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){ + //switch to next buffer + currsend = 0; + if (myRing->b <= 0){myRing->waiting = true; return;}//no next buffer? go in waiting mode. + myRing->b--; + }//completed a send + Stream::get()->dropReadLock(); +}//send diff --git a/Buffer/user.h b/Buffer/user.h new file mode 100644 index 00000000..b1cb6558 --- /dev/null +++ b/Buffer/user.h @@ -0,0 +1,41 @@ +#pragma once +#include +#include "stats.h" +#include "../util/dtsc.h" +#include "../util/socket.h" +#include "../util/tinythread.h" + +namespace Buffer{ + /// Holds connected users. + /// Keeps track of what buffer users are using and the connection status. + class user{ + public: + tthread::thread * Thread; ///< Holds the thread dealing with this user. + DTSC::Ring * myRing; ///< Ring of the buffer for this user. + int MyNum; ///< User ID of this user. + std::string MyStr; ///< User ID of this user as a string. + std::string inbuffer; ///< Used to buffer input data. + int currsend; ///< Current amount of bytes sent. + Stats lastStats; ///< Holds last known stats for this connection. + Stats tmpStats; ///< Holds temporary stats for this connection. + unsigned int curr_up; ///< Holds the current estimated transfer speed up. + unsigned int curr_down; ///< Holds the current estimated transfer speed down. + bool gotproperaudio; ///< Whether the user received proper audio yet. + void * lastpointer; ///< Pointer to data part of current buffer. + static int UserCount; ///< Global user counter. + Socket::Connection S; ///< Connection to user + /// Creates a new user from a newly connected socket. + /// Also prints "User connected" text to stdout. + user(Socket::Connection fd); + /// Drops held DTSC::Ring class, if one is held. + ~user(); + /// Disconnects the current user. Doesn't do anything if already disconnected. + /// Prints "Disconnected user" to stdout if disconnect took place. + void Disconnect(std::string reason); + /// Tries to send the current buffer, returns true if success, false otherwise. + /// Has a side effect of dropping the connection if send will never complete. + bool doSend(const char * ptr, int len); + /// Try to send data to this user. Disconnects if any problems occur. + void Send(); + }; +} diff --git a/Connector_HTTP/Makefile b/Connector_HTTP/Makefile index 7ccbfc72..0e0cf9d1 100644 --- a/Connector_HTTP/Makefile +++ b/Connector_HTTP/Makefile @@ -1,6 +1,6 @@ SRC = main.cpp ../util/socket.cpp ../util/http_parser.cpp ../util/flv_tag.cpp ../util/amf.cpp ../util/dtsc.cpp ../util/config.cpp ../util/base64.cpp OBJ = $(SRC:.cpp=.o) -OUT = DDV_Conn_HTTP +OUT = MistConnHTTP INCLUDES = DEBUG = 4 OPTIMIZE = -g @@ -16,11 +16,11 @@ default: cversion $(OUT) .cpp.o: $(CC) $(INCLUDES) $(CCFLAGS) $(LIBS) -c $< -o $@ $(OUT): $(OBJ) - $(CC) $(LIBS) -o $(OUT) $(OBJ) + $(CC) $(LIBS) -o ../bin/$(OUT) $(OBJ) clean: - rm -rf $(OBJ) $(OUT) Makefile.bak *~ + rm -rf $(OBJ) ../bin/$(OUT) Makefile.bak *~ install: $(OUT) - cp -f ./$(OUT) /usr/bin/ + cp -f ../bin/$(OUT) /usr/bin/ cversion: rm -rf ../util/config.o diff --git a/Connector_HTTP/main.cpp b/Connector_HTTP/main.cpp index 3962fe18..8ea07b08 100644 --- a/Connector_HTTP/main.cpp +++ b/Connector_HTTP/main.cpp @@ -182,7 +182,7 @@ namespace Connector_HTTP{ HTTP_S.Clean(); HTTP_S.SetHeader("Content-Type", "text/xml"); HTTP_S.SetBody(""); - HTTP_S.SendResponse(conn, "200", "OK");//geen SetBody = unknown length! Dat willen we hier. + HTTP_S.SendResponse(conn, "200", "OK"); #if DEBUG >= 3 printf("Sending crossdomain.xml file\n"); #endif @@ -206,15 +206,7 @@ namespace Connector_HTTP{ Movie = HTTP_R.url.substr(1); Movie = Movie.substr(0,Movie.find("/")); } - streamname = "/tmp/shared_socket_"; - for (std::string::iterator i=Movie.end()-1; i>=Movie.begin(); --i){ - if (!isalpha(*i) && !isdigit(*i) && *i != '_'){ - Movie.erase(i); - }else{ - *i=tolower(*i); - }//strip nonalphanumeric - } - streamname += Movie; + streamname = Movie; if( !Flash_ManifestSent ) { HTTP_S.Clean(); HTTP_S.SetHeader("Content-Type","text/xml"); @@ -227,22 +219,18 @@ namespace Connector_HTTP{ ready4data = true; }//FLASH handler if (handler == HANDLER_PROGRESSIVE){ - //in het geval progressive nemen we aan dat de URL de streamname is, met .flv erachter + //we assume the URL is the stream name with a 3 letter extension std::string extension = HTTP_R.url.substr(HTTP_R.url.size()-4); - streamname = HTTP_R.url.substr(0, HTTP_R.url.size()-4);//strip de .flv - for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ - if (!isalpha(*i) && !isdigit(*i) && *i != '_'){streamname.erase(i);}else{*i=tolower(*i);}//strip nonalphanumeric - } - streamname = "/tmp/shared_socket_" + streamname;//dit is dan onze shared_socket - //normaal zouden we ook een position uitlezen uit de URL, maar bij LIVE streams is dat zinloos + streamname = HTTP_R.url.substr(0, HTTP_R.url.size()-4);//strip the extension + /// \todo VoD streams will need support for position reading from the URL parameters ready4data = true; }//PROGRESSIVE handler - HTTP_R.CleanForNext(); //maak schoon na verwerken voor eventuele volgende requests... + HTTP_R.CleanForNext(); //clean for any possinble next requests } if (ready4data){ if (!inited){ //we are ready, connect the socket! - ss = Socket::Connection(streamname); + ss = Socket::getStream(streamname); if (!ss.connected()){ #if DEBUG >= 1 fprintf(stderr, "Could not connect to server!\n"); diff --git a/Connector_RAW/Makefile b/Connector_RAW/Makefile index c6b3ec59..1e3aa1bc 100644 --- a/Connector_RAW/Makefile +++ b/Connector_RAW/Makefile @@ -1,6 +1,6 @@ SRC = main.cpp ../util/socket.cpp OBJ = $(SRC:.cpp=.o) -OUT = DDV_Conn_RAW +OUT = MistConnRAW INCLUDES = DEBUG = 4 OPTIMIZE = -g @@ -15,9 +15,9 @@ default: $(OUT) .cpp.o: $(CC) $(INCLUDES) $(CCFLAGS) $(LIBS) -c $< -o $@ $(OUT): $(OBJ) - $(CC) $(LIBS) -o $(OUT) $(OBJ) + $(CC) $(LIBS) -o ../bin/$(OUT) $(OBJ) clean: - rm -rf $(OBJ) $(OUT) Makefile.bak *~ + rm -rf $(OBJ) ../bin/$(OUT) Makefile.bak *~ install: $(OUT) - cp -f ./$(OUT) /usr/bin/ + cp -f ../bin/$(OUT) /usr/bin/ diff --git a/Connector_RTMP/Makefile b/Connector_RTMP/Makefile index 0a337458..7f6ab177 100644 --- a/Connector_RTMP/Makefile +++ b/Connector_RTMP/Makefile @@ -1,6 +1,6 @@ SRC = main.cpp ../util/socket.cpp ../util/flv_tag.cpp ../util/amf.cpp ../util/rtmpchunks.cpp ../util/crypto.cpp ../util/config.cpp ../util/dtsc.cpp OBJ = $(SRC:.cpp=.o) -OUT = DDV_Conn_RTMP +OUT = MistConnRTMP INCLUDES = STATIC = DEBUG = 4 @@ -17,11 +17,11 @@ default: cversion $(OUT) .cpp.o: $(CC) $(INCLUDES) $(CCFLAGS) -c $< -o $@ $(OUT): $(OBJ) - $(CC) -o $(OUT) $(OBJ) $(STATIC) $(LIBS) + $(CC) -o ../bin/$(OUT) $(OBJ) $(STATIC) $(LIBS) clean: - rm -rf $(OBJ) $(OUT) Makefile.bak *~ + rm -rf $(OBJ) ../bin/$(OUT) Makefile.bak *~ install: $(OUT) - cp -f ./$(OUT) /usr/bin/ + cp -f ../bin/$(OUT) /usr/bin/ cversion: rm -rf ../util/config.o diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 94a737a2..20c860db 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -27,7 +27,7 @@ namespace Connector_RTMP{ Socket::Connection Socket; ///< Socket connected to user Socket::Connection SS; ///< Socket connected to server - std::string streamname = "/tmp/shared_socket"; ///< Stream that will be opened + std::string streamname; ///< Stream that will be opened void parseChunk(std::string & buffer);///< Parses a single RTMP chunk. void sendCommand(AMF::Object & amfreply, int messagetype, int stream_id);///< Sends a RTMP command either in AMF or AMF3 mode. void parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id);///< Parses a single AMF command message. @@ -73,7 +73,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ if (ready4data){ if (!inited){ //we are ready, connect the socket! - SS = Socket::Connection(streamname); + SS = Socket::getStream(streamname); if (!SS.connected()){ #if DEBUG >= 1 fprintf(stderr, "Could not connect to server!\n"); @@ -398,20 +398,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int if ((amfdata.getContentP(0)->StrValue() == "publish")){ if (amfdata.getContentP(3)){ streamname = amfdata.getContentP(3)->StrValue(); - for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){ - if (*i == '?'){streamname.erase(i, streamname.end()); break;} - if (!isalpha(*i) && !isdigit(*i) && *i != '_'){ - streamname.erase(i); - --i; - }else{ - *i=tolower(*i); - } - } - streamname = "/tmp/shared_socket_" + streamname; - #if DEBUG >= 4 - fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str()); - #endif - SS = Socket::Connection(streamname); + SS = Socket::getStream(streamname); if (!SS.connected()){ #if DEBUG >= 1 fprintf(stderr, "Could not connect to server!\n"); @@ -468,10 +455,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ //send streambegin streamname = amfdata.getContentP(3)->StrValue(); - for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ - if (!isalpha(*i) && !isdigit(*i) && *i != '_'){streamname.erase(i);}else{*i=tolower(*i);} - } - streamname = "/tmp/shared_socket_" + streamname; Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 //send a status reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); diff --git a/DDV_Controller/Makefile b/Controller/Makefile similarity index 88% rename from DDV_Controller/Makefile rename to Controller/Makefile index 2782d63f..757666b2 100644 --- a/DDV_Controller/Makefile +++ b/Controller/Makefile @@ -1,6 +1,6 @@ SRC = main.cpp ../util/json.cpp ../util/socket.cpp ../util/http_parser.cpp ../util/md5.cpp ../util/config.cpp ../util/procs.cpp ../util/base64.cpp ../util/auth.cpp OBJ = $(SRC:.cpp=.o) -OUT = DDV_Controller +OUT = MistController INCLUDES = DEBUG = 4 OPTIMIZE = -g @@ -19,9 +19,9 @@ default: cversion $(OUT) .cpp.o: $(CC) $(INCLUDES) $(CCFLAGS) -c $< -o $@ $(OUT): $(OBJ) - $(CC) -o $(OUT) $(OBJ) $(LIBS) + $(CC) -o ../bin/$(OUT) $(OBJ) $(LIBS) clean: - rm -rf $(OBJ) $(OUT) Makefile.bak *~ + rm -rf $(OBJ) ../bin/$(OUT) Makefile.bak *~ cversion: rm -rf ../util/config.o diff --git a/DDV_Controller/main.cpp b/Controller/main.cpp similarity index 100% rename from DDV_Controller/main.cpp rename to Controller/main.cpp diff --git a/Makefile b/Makefile index 98bf11e1..ad0baa78 100644 --- a/Makefile +++ b/Makefile @@ -1,39 +1,43 @@ default: client .PHONY: client client-debug client-clean clean release-install debug-install docs -client-debug: +prepare: + mkdir -p ./bin + cp -f *.html ./bin/ +client-debug: prepare cd Connector_HTTP; $(MAKE) cd Connector_RTMP; $(MAKE) cd Connector_RAW; $(MAKE) cd Buffer; $(MAKE) - cd DDV_Controller; $(MAKE) + cd Controller; $(MAKE) client: client-debug client-clean: cd Connector_HTTP; $(MAKE) clean cd Connector_RTMP; $(MAKE) clean cd Connector_RAW; $(MAKE) clean cd Buffer; $(MAKE) clean - cd DDV_Controller; $(MAKE) clean + cd Controller; $(MAKE) clean clean: client-clean -client-release: + rm -rf ./bin +client-release: prepare cd Connector_HTTP; $(MAKE) DEBUG=0 OPTIMIZE=-O2 cd Connector_RTMP; $(MAKE) DEBUG=0 OPTIMIZE=-O2 cd Connector_RAW; $(MAKE) DEBUG=0 OPTIMIZE=-O2 cd Buffer; $(MAKE) DEBUG=0 OPTIMIZE=-O2 - cd DDV_Controller; $(MAKE) DEBUG=0 OPTIMIZE=-O2 + cd Controller; $(MAKE) DEBUG=0 OPTIMIZE=-O2 release: client-release release-install: client-clean client-release cd Connector_RTMP; $(MAKE) install cd Connector_HTTP; $(MAKE) install cd Connector_RAW; $(MAKE) install cd Buffer; $(MAKE) install - cd DDV_Controller; $(MAKE) install + cd Controller; $(MAKE) install debug-install: client-clean client-debug cd Connector_RTMP; $(MAKE) install cd Connector_HTTP; $(MAKE) install cd Connector_RAW; $(MAKE) install cd Buffer; $(MAKE) install - cd DDV_Controller; $(MAKE) install + cd Controller; $(MAKE) install docs: doxygen ./Doxyfile > /dev/null diff --git a/util/config.cpp b/util/config.cpp index b426766c..b7533fc3 100644 --- a/util/config.cpp +++ b/util/config.cpp @@ -26,12 +26,7 @@ Util::Config::Config(){ listen_port = 4242; daemon_mode = true; interface = "0.0.0.0"; - configfile = "/etc/ddvtech.conf"; username = "root"; - ignore_daemon = false; - ignore_interface = false; - ignore_port = false; - ignore_user = false; } /// Parses commandline arguments. @@ -48,17 +43,15 @@ void Util::Config::parseArgs(int argc, char ** argv){ {"username",1,0,'u'}, {"no-daemon",0,0,'n'}, {"daemon",0,0,'d'}, - {"configfile",1,0,'c'}, {"version",0,0,'v'} }; while ((opt = getopt_long(argc, argv, optString, longOpts, 0)) != -1){ switch (opt){ - case 'p': listen_port = atoi(optarg); ignore_port = true; break; - case 'i': interface = optarg; ignore_interface = true; break; - case 'n': daemon_mode = false; ignore_daemon = true; break; - case 'd': daemon_mode = true; ignore_daemon = true; break; - case 'c': configfile = optarg; break; - case 'u': username = optarg; ignore_user = true; break; + case 'p': listen_port = atoi(optarg); break; + case 'i': interface = optarg; break; + case 'n': daemon_mode = false; break; + case 'd': daemon_mode = true; break; + case 'u': username = optarg; break; case 'v': printf("%s\n", TOSTRING(VERSION)); exit(1); @@ -67,16 +60,9 @@ void Util::Config::parseArgs(int argc, char ** argv){ case '?': std::string doingdaemon = "true"; if (!daemon_mode){doingdaemon = "false";} - if (confsection == ""){ - printf("Options: -h[elp], -?, -v[ersion], -n[odaemon], -d[aemon], -p[ort] VAL, -i[nterface] VAL, -u[sername] VAL\n"); - printf("Defaults:\n interface: %s\n port: %i\n daemon mode: %s\n username: %s\n", interface.c_str(), listen_port, doingdaemon.c_str(), username.c_str()); - }else{ - printf("Options: -h[elp], -?, -v[ersion], -n[odaemon], -d[aemon], -p[ort] VAL, -i[nterface] VAL, -c[onfigfile] VAL, -u[sername] VAL\n"); - printf("Defaults:\n interface: %s\n port: %i\n daemon mode: %s\n configfile: %s\n username: %s\n", interface.c_str(), listen_port, doingdaemon.c_str(), configfile.c_str(), username.c_str()); - printf("Username root means no change to UID, no matter what the UID is.\n"); - printf("If the configfile exists, it is always loaded first. Commandline settings then overwrite the config file.\n"); - printf("\nThis process takes it directives from the %s section of the configfile.\n", confsection.c_str()); - } + printf("Options: -h[elp], -?, -v[ersion], -n[odaemon], -d[aemon], -p[ort] VAL, -i[nterface] VAL, -u[sername] VAL\n"); + printf("Defaults:\n interface: %s\n port: %i\n daemon mode: %s\n username: %s\n", interface.c_str(), listen_port, doingdaemon.c_str(), username.c_str()); + printf("Username root means no change to UID, no matter what the UID is.\n"); printf("This is %s version %s\n", argv[0], TOSTRING(VERSION)); exit(1); break; @@ -84,37 +70,6 @@ void Util::Config::parseArgs(int argc, char ** argv){ }//commandline options parser } -/// Parses the configuration file at configfile, if it exists. -/// Assumes confsection is set. -void Util::Config::parseFile(){ - std::ifstream conf(configfile.c_str(), std::ifstream::in); - std::string tmpstr; - bool acc_comm = false; - size_t foundeq; - if (conf.fail()){ - #if DEBUG >= 3 - fprintf(stderr, "Configuration file %s not found - using build-in defaults...\n", configfile.c_str()); - #endif - }else{ - while (conf.good()){ - getline(conf, tmpstr); - if (tmpstr[0] == '['){//new section? check if we care. - if (tmpstr == confsection){acc_comm = true;}else{acc_comm = false;} - }else{ - if (!acc_comm){break;}//skip all lines in this section if we do not care about it - foundeq = tmpstr.find('='); - if (foundeq != std::string::npos){ - if ((tmpstr.substr(0, foundeq) == "port") && !ignore_port){listen_port = atoi(tmpstr.substr(foundeq+1).c_str());} - if ((tmpstr.substr(0, foundeq) == "interface") && !ignore_interface){interface = tmpstr.substr(foundeq+1);} - if ((tmpstr.substr(0, foundeq) == "username") && !ignore_user){username = tmpstr.substr(foundeq+1);} - if ((tmpstr.substr(0, foundeq) == "daemon") && !ignore_daemon){daemon_mode = true;} - if ((tmpstr.substr(0, foundeq) == "nodaemon") && !ignore_daemon){daemon_mode = false;} - }//found equals sign - }//section contents - }//configfile line loop - }//configuration -} - /// Sets the current process' running user void Util::setUser(std::string username){ if (username != "root"){ diff --git a/util/config.h b/util/config.h index 5a0b0da9..acf3fb17 100644 --- a/util/config.h +++ b/util/config.h @@ -9,23 +9,15 @@ /// Contains utility code, not directly related to streaming media namespace Util{ - /// Deals with parsing configuration from files or commandline options. + /// Deals with parsing configuration from commandline options. class Config{ - private: - bool ignore_daemon; - bool ignore_interface; - bool ignore_port; - bool ignore_user; - public: - std::string confsection; - std::string configfile; - bool daemon_mode; - std::string interface; - int listen_port; - std::string username; - Config(); - void parseArgs(int argc, char ** argv); - void parseFile(); + public: + bool daemon_mode; + std::string interface; + int listen_port; + std::string username; + Config(); + void parseArgs(int argc, char ** argv); }; /// Will set the active user to the named username. diff --git a/util/server_setup.cpp b/util/server_setup.cpp index 47f015ac..8f8cfad4 100644 --- a/util/server_setup.cpp +++ b/util/server_setup.cpp @@ -15,12 +15,6 @@ #endif -#ifndef CONFIGSECT - /// Configuration file section for this server. - #define CONFIGSECT None - #error "No configuration file section was set!" -#endif - #include "socket.h" //Socket library #include "config.h" //utilities for config management #include @@ -83,10 +77,8 @@ int main(int argc, char ** argv){ //set and parse configuration Util::Config C; - C.confsection = TOSTRING(CONFIGSECT); C.listen_port = DEFAULT_PORT; C.parseArgs(argc, argv); - C.parseFile(); //setup a new server socket, for the correct interface and port server_socket = Socket::Server(C.listen_port, C.interface); diff --git a/util/socket.cpp b/util/socket.cpp index e751b097..603075ee 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -3,6 +3,7 @@ /// Written by Jaron Vietor in 2010 for DDVTech #include "socket.h" +#include #include #include #include @@ -647,3 +648,43 @@ bool Socket::Server::connected(){ /// Returns internal socket number. int Socket::Server::getSocket(){return sock;} + +/// Connect to a stream on the system. +/// Filters the streamname, removing invalid characters and +/// converting all letters to lowercase. +/// If a '?' character is found, everything following that character is deleted. +Socket::Connection Socket::getStream(std::string streamname){ + //strip anything that isn't numbers, digits or underscores + for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ + if (*i == '?'){streamname.erase(i, streamname.end()); break;} + if (!isalpha(*i) && !isdigit(*i) && *i != '_'){ + streamname.erase(i); + }else{ + *i=tolower(*i); + } + } + return Socket::Connection("/tmp/mist/stream_"+streamname); +} + +/// Create a stream on the system. +/// Filters the streamname, removing invalid characters and +/// converting all letters to lowercase. +/// If a '?' character is found, everything following that character is deleted. +/// If the /tmp/ddvtech directory doesn't exist yet, this will create it. +Socket::Server Socket::makeStream(std::string streamname){ + //strip anything that isn't numbers, digits or underscores + for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ + if (*i == '?'){streamname.erase(i, streamname.end()); break;} + if (!isalpha(*i) && !isdigit(*i) && *i != '_'){ + streamname.erase(i); + }else{ + *i=tolower(*i); + } + } + std::string loc = "/tmp/mist/stream_"+streamname; + //attempt to create the /tmp/mist directory if it doesn't exist already. + //ignore errors - we catch all problems in the Socket::Server creation already + mkdir("/tmp/mist", S_IRWXU | S_IRWXG | S_IRWXO); + //create and return the Socket::Server + return Socket::Server(loc); +} diff --git a/util/socket.h b/util/socket.h index 3a538286..503ec4e9 100644 --- a/util/socket.h +++ b/util/socket.h @@ -77,4 +77,10 @@ namespace Socket{ int getSocket(); ///< Returns internal socket number. }; + /// Connect to a stream on the system. + Connection getStream(std::string streamname); + + /// Create a stream on the system. + Server makeStream(std::string streamname); + };