diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index 3671ca99..0caec676 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -1194,13 +1194,13 @@ namespace IPC { } if (!hasCounter) { DEBUG_MSG(DLVL_WARN, "Trying to time-out an element without counters"); + myPage.close(); return; } - if (myPage.mapped) { - semGuard tmpGuard(&mySemaphore); - myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0); - HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1)); - } + semGuard tmpGuard(&mySemaphore); + myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0); + HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1)); + myPage.close(); } ///\brief Re-initialize the counter diff --git a/src/io.cpp b/src/io.cpp index 86d58ce5..b0f0fd96 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -29,6 +29,18 @@ namespace Mist { } + void negotiationProxy::clear(){ + pagesByTrack.clear(); + trackOffset.clear(); + trackState.clear(); + trackMap.clear(); + metaPages.clear(); + curPageNum.clear(); + curPage.clear(); + negTimer = 0; + userClient.finish(); + } + /*LTS-START*/ void negotiationProxy::initiateEncryption(){ static bool encInit = false; @@ -282,15 +294,6 @@ namespace Mist { return 0; } - ///Buffers the next packet on the currently opened page - ///\param pack The packet to buffer - void InOutBase::bufferNext(JSON::Value & pack) { - std::string packData = pack.toNetPacked(); - DTSC::Packet newPack(packData.data(), packData.size()); - ///\note Internally calls bufferNext(DTSC::Packet & pack) - nProxy.bufferNext(newPack, myMeta); - } - ///Buffers the next packet on the currently opened page ///\param pack The packet to buffer void InOutBase::bufferNext(DTSC::Packet & pack) { @@ -449,19 +452,6 @@ namespace Mist { curPageNum.erase(tid); } - ///Buffers a live packet to a page. - /// - ///Handles both buffering and creation of new pages - /// - ///Initiates/continues negotiation with the buffer as well - ///\param packet The packet to buffer - void InOutBase::bufferLivePacket(JSON::Value & packet) { - DTSC::Packet realPacket; - realPacket.genericFill(packet["time"].asInt(), packet["offset"].asInt(), packet["trackid"].asInt(), packet["data"].asStringRef().c_str(), packet["data"].asStringRef().size(), packet["bpos"].asInt(), packet["keyframe"].asInt()); - bufferLivePacket(realPacket); - } - - ///Buffers a live packet to a page. /// ///Handles both buffering and creation of new pages @@ -596,6 +586,11 @@ namespace Mist { nProxy.continueNegotiate(tid, myMeta, quickNegotiate); } + negotiationProxy::negotiationProxy(){ + encrypt = false; + negTimer = 0; + } + void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate) { if (!tid) { return; @@ -704,9 +699,11 @@ namespace Mist { INSANE_MSG("NewTid: %0.8lX", newTid); if (newTid == 0x80000000u) { INSANE_MSG("Breaking because not set yet"); + negTimer++; break; } HIGH_MSG("Track %lu temporarily mapped to %lu", tid, newTid); + negTimer = 0; char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), newTid); @@ -734,6 +731,7 @@ namespace Mist { unsigned long firstPage = firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5]; if (firstPage == 0xFFFF) { HIGH_MSG("Negotiating, but firstPage not yet set, waiting for buffer"); + negTimer++; break; } #if defined(__CYGWIN__) || defined(_WIN32) @@ -747,6 +745,7 @@ namespace Mist { trackMap.erase(tid); break; } + negTimer = 0; //Reinitialize so we can be sure we got the right values here finalTid = ((long)(tmp[offset]) << 24) | ((long)(tmp[offset + 1]) << 16) | ((long)(tmp[offset + 2]) << 8) | tmp[offset + 3]; firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5]; diff --git a/src/io.h b/src/io.h index 56617c89..feef5a9e 100644 --- a/src/io.h +++ b/src/io.h @@ -28,7 +28,8 @@ namespace Mist { class negotiationProxy { public: - negotiationProxy() : encrypt(false) {} + negotiationProxy(); + void clear(); void initiateEncryption();//LTS bool bufferStart(unsigned long tid, unsigned long pageNumber, DTSC::Meta & myMeta); void bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta); @@ -62,6 +63,7 @@ namespace Mist { IPC::sharedPage encryptionPage; void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate = false); + uint32_t negTimer; ///< How long we've been negotiating, in packets. }; ///\brief Class containing all basic input and output functions. @@ -70,11 +72,9 @@ namespace Mist { void initiateMeta(); bool bufferStart(unsigned long tid, unsigned long pageNumber); void bufferNext(DTSC::Packet & pack); - void bufferNext(JSON::Value & pack); void bufferFinalize(unsigned long tid); void bufferRemove(unsigned long tid, unsigned long pageNumber); - void bufferLivePacket(JSON::Value & packet); - void bufferLivePacket(DTSC::Packet & packet); + virtual void bufferLivePacket(DTSC::Packet & packet); protected: void continueNegotiate(unsigned long tid, bool quickNegotiate = false); diff --git a/src/output/output.cpp b/src/output/output.cpp index 2e60c8cc..f5e9e365 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -45,7 +45,17 @@ namespace Mist{ cfg->addOption("noinput", option); } + void Output::bufferLivePacket(DTSC::Packet & packet){ + if (nProxy.negTimer > 600){ + WARN_MSG("No negotiation response from buffer - reconnecting."); + nProxy.clear(); + reconnect(); + } + InOutBase::bufferLivePacket(packet); + } + Output::Output(Socket::Connection & conn) : myConn(conn){ + pushing = false; firstTime = 0; crc = getpid(); parseData = false; @@ -82,8 +92,8 @@ namespace Mist{ } void Output::updateMeta(){ - //cancel if not alive - if (!nProxy.userClient.isAlive()){ + //cancel if not alive or pushing a new stream + if (!nProxy.userClient.isAlive() || (isPushing() && myMeta.tracks.size())){ return; } //read metadata from page to myMeta variable @@ -273,6 +283,7 @@ namespace Mist{ } bool Output::isReadyForPlay(){ + if (isPushing()){return true;} if (myMeta.tracks.size()){ if (!selectedTracks.size()){ selectDefaultTracks(); @@ -462,7 +473,7 @@ namespace Mist{ MEDIUM_MSG("Selected tracks: %s (%lu)", selected.str().c_str(), selectedTracks.size()); } - if (!selectedTracks.size() && myMeta.tracks.size()){ + if (!selectedTracks.size() && myMeta.tracks.size() && capa["codecs"][bestSoFar].size()){ WARN_MSG("No tracks selected (%u total) for stream %s!", myMeta.tracks.size(), streamName.c_str()); } } @@ -1167,6 +1178,9 @@ namespace Mist{ /// Outputs used as an input should return INPUT, outputs used for automation should return OUTPUT, others should return their proper name. /// The default implementation is usually good enough for all the non-INPUT types. std::string Output::getStatsName(){ + if (isPushing()){ + return "INPUT"; + } if (config->hasOption("target") && config->getString("target").size()){ return "OUTPUT"; }else{ @@ -1228,7 +1242,7 @@ namespace Mist{ myConn.close(); return; } - if (!nProxy.trackMap.size()){ + if (!isPushing()){ IPC::userConnection userConn(nProxy.userClient.getData()); for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){ userConn.setTrackId(tNum, *it); @@ -1276,6 +1290,7 @@ namespace Mist{ /// Runs all appropriate triggers and checks. /// Returns true if the push should continue, false otherwise. bool Output::allowPush(const std::string & passwd){ + pushing = true; std::string strmSource; // Initialize the stream source if needed, connect to it @@ -1285,10 +1300,12 @@ namespace Mist{ if (!strmSource.size()){ FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str()); + pushing = false; return false; } if (strmSource.substr(0, 7) != "push://"){ FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), strmSource.c_str()); + pushing = false; return false; } @@ -1317,6 +1334,7 @@ namespace Mist{ std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl; if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){ FAIL_MSG("Push from %s to %s rejected - STREAM_PUSH trigger denied the push", getConnectedHost().c_str(), streamName.c_str()); + pushing = false; return false; } } @@ -1325,6 +1343,7 @@ namespace Mist{ if (IP != ""){ if (!myConn.isAddress(IP)){ FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str()); + pushing = false; return false; } } diff --git a/src/output/output.h b/src/output/output.h index 5267a418..ff249a4b 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -99,6 +99,7 @@ namespace Mist { std::set buffer;///< A sorted list of next-to-be-loaded packets. bool sought;/// bookKeeping; virtual bool isRecording(){return false;}; + virtual bool isPushing(){return pushing;}; bool allowPush(const std::string & passwd); + void bufferLivePacket(DTSC::Packet & packet); }; } diff --git a/src/output/output_dtsc.cpp b/src/output/output_dtsc.cpp index 3d90da69..333635da 100644 --- a/src/output/output_dtsc.cpp +++ b/src/output/output_dtsc.cpp @@ -28,7 +28,6 @@ namespace Mist { Bit::htobl(sSize, prep.packedSize()); myConn.SendNow(sSize, 4); prep.sendTo(myConn); - pushing = false; lastActive = Util::epoch(); } @@ -174,9 +173,7 @@ namespace Mist { streamName = dScan.getMember("stream").asString(); std::string passString = dScan.getMember("password").asString(); Util::sanitizeName(streamName); - pushing = true; if (!allowPush(passString)){ - pushing = false; myConn.close(); return; } diff --git a/src/output/output_dtsc.h b/src/output/output_dtsc.h index df379b53..d1236e88 100644 --- a/src/output/output_dtsc.h +++ b/src/output/output_dtsc.h @@ -16,7 +16,6 @@ namespace Mist { unsigned int lastActive;///