diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index aef4f172..0f93d803 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -1196,13 +1196,13 @@ namespace IPC { } if (!hasCounter) { DEBUG_MSG(DLVL_WARN, "Trying to time-out an element without counters"); + myPage.close(); return; } - if (myPage.mapped) { - semGuard tmpGuard(&mySemaphore); - myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0); - HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1)); - } + semGuard tmpGuard(&mySemaphore); + myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0); + HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1)); + myPage.close(); } ///\brief Re-initialize the counter diff --git a/src/io.cpp b/src/io.cpp index fc85a3f3..4dff9e39 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -29,6 +29,18 @@ namespace Mist { } + void negotiationProxy::clear(){ + pagesByTrack.clear(); + trackOffset.clear(); + trackState.clear(); + trackMap.clear(); + metaPages.clear(); + curPageNum.clear(); + curPage.clear(); + negTimer = 0; + userClient.finish(); + } + bool InOutBase::bufferStart(unsigned long tid, unsigned long pageNumber) { VERYHIGH_MSG("bufferStart for stream %s, track %lu, page %lu", streamName.c_str(), tid, pageNumber); //Initialize the stream metadata if it does not yet exist @@ -239,15 +251,6 @@ namespace Mist { return 0; } - ///Buffers the next packet on the currently opened page - ///\param pack The packet to buffer - void InOutBase::bufferNext(JSON::Value & pack) { - std::string packData = pack.toNetPacked(); - DTSC::Packet newPack(packData.data(), packData.size()); - ///\note Internally calls bufferNext(DTSC::Packet & pack) - nProxy.bufferNext(newPack, myMeta); - } - ///Buffers the next packet on the currently opened page ///\param pack The packet to buffer void InOutBase::bufferNext(DTSC::Packet & pack) { @@ -384,19 +387,6 @@ namespace Mist { curPageNum.erase(tid); } - ///Buffers a live packet to a page. - /// - ///Handles both buffering and creation of new pages - /// - ///Initiates/continues negotiation with the buffer as well - ///\param packet The packet to buffer - void InOutBase::bufferLivePacket(JSON::Value & packet) { - DTSC::Packet realPacket; - realPacket.genericFill(packet["time"].asInt(), packet["offset"].asInt(), packet["trackid"].asInt(), packet["data"].asStringRef().c_str(), packet["data"].asStringRef().size(), packet["bpos"].asInt(), packet["keyframe"].asInt()); - bufferLivePacket(realPacket); - } - - ///Buffers a live packet to a page. /// ///Handles both buffering and creation of new pages @@ -531,6 +521,10 @@ namespace Mist { nProxy.continueNegotiate(tid, myMeta, quickNegotiate); } + negotiationProxy::negotiationProxy(){ + negTimer = 0; + } + void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate) { if (!tid) { return; @@ -639,9 +633,11 @@ namespace Mist { INSANE_MSG("NewTid: %0.8lX", newTid); if (newTid == 0x80000000u) { INSANE_MSG("Breaking because not set yet"); + negTimer++; break; } HIGH_MSG("Track %lu temporarily mapped to %lu", tid, newTid); + negTimer = 0; char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), newTid); @@ -669,6 +665,7 @@ namespace Mist { unsigned long firstPage = firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5]; if (firstPage == 0xFFFF) { HIGH_MSG("Negotiating, but firstPage not yet set, waiting for buffer"); + negTimer++; break; } #if defined(__CYGWIN__) || defined(_WIN32) @@ -682,6 +679,7 @@ namespace Mist { trackMap.erase(tid); break; } + negTimer = 0; //Reinitialize so we can be sure we got the right values here finalTid = ((long)(tmp[offset]) << 24) | ((long)(tmp[offset + 1]) << 16) | ((long)(tmp[offset + 2]) << 8) | tmp[offset + 3]; firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5]; diff --git a/src/io.h b/src/io.h index 813473d1..0ab55171 100644 --- a/src/io.h +++ b/src/io.h @@ -27,7 +27,8 @@ namespace Mist { class negotiationProxy { public: - negotiationProxy() {} + negotiationProxy(); + void clear(); bool bufferStart(unsigned long tid, unsigned long pageNumber, DTSC::Meta & myMeta); void bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta); void bufferFinalize(unsigned long tid, DTSC::Meta &myMeta); @@ -55,6 +56,7 @@ namespace Mist { std::string streamName;///< Name of the stream to connect to void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate = false); + uint32_t negTimer; ///< How long we've been negotiating, in packets. }; ///\brief Class containing all basic input and output functions. @@ -63,11 +65,9 @@ namespace Mist { void initiateMeta(); bool bufferStart(unsigned long tid, unsigned long pageNumber); void bufferNext(DTSC::Packet & pack); - void bufferNext(JSON::Value & pack); void bufferFinalize(unsigned long tid); void bufferRemove(unsigned long tid, unsigned long pageNumber); - void bufferLivePacket(JSON::Value & packet); - void bufferLivePacket(DTSC::Packet & packet); + virtual void bufferLivePacket(DTSC::Packet & packet); protected: void continueNegotiate(unsigned long tid, bool quickNegotiate = false); diff --git a/src/output/output.cpp b/src/output/output.cpp index 8f3a111b..6ade1eaa 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -38,7 +38,17 @@ namespace Mist{ cfg->addOption("noinput", option); } + void Output::bufferLivePacket(DTSC::Packet & packet){ + if (nProxy.negTimer > 600){ + WARN_MSG("No negotiation response from buffer - reconnecting."); + nProxy.clear(); + reconnect(); + } + InOutBase::bufferLivePacket(packet); + } + Output::Output(Socket::Connection & conn) : myConn(conn){ + pushing = false; firstTime = 0; crc = getpid(); parseData = false; @@ -74,8 +84,8 @@ namespace Mist{ } void Output::updateMeta(){ - //cancel if not alive - if (!nProxy.userClient.isAlive()){ + //cancel if not alive or pushing a new stream + if (!nProxy.userClient.isAlive() || (isPushing() && myMeta.tracks.size())){ return; } //read metadata from page to myMeta variable @@ -144,6 +154,7 @@ namespace Mist{ } bool Output::isReadyForPlay(){ + if (isPushing()){return true;} if (myMeta.tracks.size()){ if (!selectedTracks.size()){ selectDefaultTracks(); @@ -992,6 +1003,9 @@ namespace Mist{ /// Outputs used as an input should return INPUT, outputs used for automation should return OUTPUT, others should return their proper name. /// The default implementation is usually good enough for all the non-INPUT types. std::string Output::getStatsName(){ + if (isPushing()){ + return "INPUT"; + } if (config->hasOption("target") && config->getString("target").size()){ return "OUTPUT"; }else{ @@ -1045,7 +1059,7 @@ namespace Mist{ myConn.close(); return; } - if (!nProxy.trackMap.size()){ + if (!isPushing()){ IPC::userConnection userConn(nProxy.userClient.getData()); for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){ userConn.setTrackId(tNum, *it); @@ -1092,6 +1106,7 @@ namespace Mist{ /// Runs all appropriate triggers and checks. /// Returns true if the push should continue, false otherwise. bool Output::allowPush(const std::string & passwd){ + pushing = true; std::string strmSource; // Initialize the stream source if needed, connect to it @@ -1101,18 +1116,22 @@ namespace Mist{ if (!strmSource.size()){ FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str()); + pushing = false; return false; } if (strmSource.substr(0, 7) != "push://"){ FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), strmSource.c_str()); + pushing = false; return false; } std::string source = strmSource.substr(7); std::string IP = source.substr(0, source.find('@')); + if (IP != ""){ if (!myConn.isAddress(IP)){ FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str()); + pushing = false; return false; } } diff --git a/src/output/output.h b/src/output/output.h index ec7883df..539a17c0 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -84,8 +84,9 @@ namespace Mist { std::map nxtKeyNum;///< Contains the number of the next key, for page seeking purposes. std::set buffer;///< A sorted list of next-to-be-loaded packets. bool sought;/// bookKeeping; + virtual bool isPushing(){return pushing;}; bool allowPush(const std::string & passwd); + void bufferLivePacket(DTSC::Packet & packet); }; } diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index d9210010..aa55305e 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -53,9 +53,9 @@ namespace Mist { Output::onFail(); } - /// We assume it's ready to play if there is at least one track available + /// The HTTP output is always ready to play bool OutHTTP::isReadyForPlay() { - return myMeta.tracks.size(); + return true; } void OutHTTP::init(Util::Config * cfg){ @@ -68,7 +68,7 @@ namespace Mist { capa["url_match"].append("/crossdomain.xml"); capa["url_match"].append("/clientaccesspolicy.xml"); capa["url_match"].append("/$.html"); - capa["url_match"].append("/$.ico"); + capa["url_match"].append("/favicon.ico"); capa["url_match"].append("/$.smil"); capa["url_match"].append("/info_$.js"); capa["url_match"].append("/json_$.js"); diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index de2cf5c3..bd6fc99b 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -9,7 +9,6 @@ namespace Mist { OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) { - isPushing = false; setBlocking(true); while (!conn.Received().available(1537) && conn.connected() && config->is_active) { conn.spool(); @@ -36,21 +35,6 @@ namespace Mist { minSkipAhead = 500; } - bool OutRTMP::isReadyForPlay(){ - if (isPushing){ - return true; - } - return Output::isReadyForPlay(); - } - - std::string OutRTMP::getStatsName(){ - if (isPushing){ - return "INPUT"; - }else{ - return Output::getStatsName(); - } - } - bool OutRTMP::onFinish(){ MEDIUM_MSG("Finishing stream %s, %s", streamName.c_str(), myConn?"while connected":"already disconnected"); if (myConn){ @@ -587,9 +571,7 @@ namespace Mist { Util::sanitizeName(streamName); - isPushing = true; if (!allowPush("")){ - isPushing = false; onFinish(); return; } diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index c9689011..3256c6b5 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -13,17 +13,14 @@ namespace Mist { void onRequest(); void sendNext(); void sendHeader(); - bool isReadyForPlay(); bool onFinish(); protected: uint64_t rtmpOffset; - bool isPushing; void parseVars(std::string data); std::string app_name; void parseChunk(Socket::Buffer & inputBuffer); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId); void sendCommand(AMF::Object & amfReply, int messageType, int streamId); - std::string getStatsName(); }; }