From 26d9a6cabffdad62beb3bcdd4908ae76afc03364 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sat, 17 Nov 2012 12:14:28 +0100 Subject: [PATCH] Fixes to MistBuffer for live input as well as the input side of the RTMP connector. --- src/buffer.cpp | 20 +++++++++++++------- src/buffer_stream.cpp | 15 ++++++++++----- src/buffer_user.cpp | 9 ++------- src/conn_rtmp.cpp | 4 ++-- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/buffer.cpp b/src/buffer.cpp index 567a1566..cd3888c0 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -37,6 +37,7 @@ namespace Buffer{ Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); while (buffer_running){ usleep(1000000); //sleep one second + Stream::get()->cleanUsers(); if (!StatsSocket.connected()){ StatsSocket = Socket::Connection("/tmp/mist/statistics", true); } @@ -56,8 +57,9 @@ namespace Buffer{ #endif usr->myRing = thisStream->getRing(); - usr->S.Send(thisStream->getHeader()); - usr->S.flush(); + if (thisStream->getHeader().size() > 0){ + usr->S.SendNow(thisStream->getHeader()); + } while (usr->S.connected()){ usleep(5000); //sleep 5ms @@ -114,7 +116,6 @@ namespace Buffer{ } } usr->Disconnect("Socket closed."); - thisStream->cleanUsers(); } /// Loop reading DTSC data from stdin and processing it at the correct speed. @@ -161,8 +162,8 @@ namespace Buffer{ if (thisStream->getIPInput().connected()){ if (thisStream->getIPInput().spool()){ thisStream->getWriteLock(); - if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received().get())){ - thisStream->getStream()->outPacket(0); + if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){ + //thisStream->getStream()->outPacket(0); thisStream->dropWriteLock(true); }else{ thisStream->dropWriteLock(false); @@ -223,10 +224,15 @@ namespace Buffer{ // disconnect listener buffer_running = false; - std::cout << "End of input file - buffer shutting down" << std::endl; + std::cout << "Buffer shutting down" << std::endl; SS.close(); - if (StatsThread){StatsThread->join();} + if (StatsThread){ + StatsThread->join(); + delete StatsThread; + } + if (thisStream->getIPInput().connected()){thisStream->getIPInput().close();} StdinThread->join(); + delete StdinThread; delete thisStream; return 0; } diff --git a/src/buffer_stream.cpp b/src/buffer_stream.cpp index cafacf86..8f56327b 100644 --- a/src/buffer_stream.cpp +++ b/src/buffer_stream.cpp @@ -32,10 +32,8 @@ Buffer::Stream::~Stream(){ stats_mutex.lock(); for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ if ((**usersIt).S.connected()){ - if ((**usersIt).myRing->waiting){ - (**usersIt).S.close(); - printf("Closing user %s\n", (**usersIt).MyStr.c_str()); - } + (**usersIt).S.close(); + printf("Closing user %s\n", (**usersIt).MyStr.c_str()); } } stats_mutex.unlock(); @@ -146,7 +144,6 @@ void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string Storage["log"][username]["host"] = stats.host; Storage["log"][username]["start"] = Util::epoch() - stats.conntime; stats_mutex.unlock(); - cleanUsers(); } /// Cleans up broken connections @@ -162,6 +159,14 @@ void Buffer::Stream::cleanUsers(){ users.erase(usersIt); repeat = true; break; + }else{ + if (!(**usersIt).S.connected()){ + if ((**usersIt).Thread->joinable()){ + (**usersIt).Thread->join(); + delete (**usersIt).Thread; + (**usersIt).Thread = 0; + } + } } } } diff --git a/src/buffer_user.cpp b/src/buffer_user.cpp index b26cf6d5..6d52be68 100644 --- a/src/buffer_user.cpp +++ b/src/buffer_user.cpp @@ -31,19 +31,14 @@ Buffer::user::~user(){ /// Disconnects the current user. Doesn't do anything if already disconnected. /// Prints "Disconnected user" to stdout if disconnect took place. void Buffer::user::Disconnect(std::string reason) { - Stream::get()->clearStats(MyStr, lastStats, reason); if (S.connected()){S.close();} - if (Thread != 0){ - if (Thread->joinable()){ - Thread->join(); - } - Thread = 0; - } + Stream::get()->clearStats(MyStr, lastStats, reason); }//Disconnect /// Tries to send the current buffer, returns true if success, false otherwise. /// Has a side effect of dropping the connection if send will never complete. bool Buffer::user::doSend(const char * ptr, int len){ + if (!len){return false;}//do not do empty sends int r = S.iwrite(ptr+currsend, len-currsend); if (r <= 0){ if (errno == EWOULDBLOCK){return false;} diff --git a/src/conn_rtmp.cpp b/src/conn_rtmp.cpp index cc28fca7..7329af5a 100644 --- a/src/conn_rtmp.cpp +++ b/src/conn_rtmp.cpp @@ -291,9 +291,9 @@ void Connector_RTMP::parseChunk(Socket::Buffer & inbuffer){ if (counter > 8){ sending = true; SS.SendNow(meta_out.toNetPacked()); - SS.SendNow(prebuffer.str().c_str());//write buffer + SS.SendNow(prebuffer.str().c_str(), prebuffer.str().size());//write buffer prebuffer.str("");//clear buffer - SS.Send(pack_out.toNetPacked()); + SS.SendNow(pack_out.toNetPacked()); }else{ prebuffer << pack_out.toNetPacked(); }