bufferLivePacket improvements for generic Outputs

This commit is contained in:
Thulinma 2017-03-16 17:12:54 +01:00
parent bd2f1724f6
commit eef9303e61
8 changed files with 58 additions and 59 deletions

View file

@ -1196,13 +1196,13 @@ namespace IPC {
} }
if (!hasCounter) { if (!hasCounter) {
DEBUG_MSG(DLVL_WARN, "Trying to time-out an element without counters"); DEBUG_MSG(DLVL_WARN, "Trying to time-out an element without counters");
myPage.close();
return; return;
} }
if (myPage.mapped) { semGuard tmpGuard(&mySemaphore);
semGuard tmpGuard(&mySemaphore); myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0);
myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0); HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1));
HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1)); myPage.close();
}
} }
///\brief Re-initialize the counter ///\brief Re-initialize the counter

View file

@ -29,6 +29,18 @@ namespace Mist {
} }
void negotiationProxy::clear(){
pagesByTrack.clear();
trackOffset.clear();
trackState.clear();
trackMap.clear();
metaPages.clear();
curPageNum.clear();
curPage.clear();
negTimer = 0;
userClient.finish();
}
bool InOutBase::bufferStart(unsigned long tid, unsigned long pageNumber) { bool InOutBase::bufferStart(unsigned long tid, unsigned long pageNumber) {
VERYHIGH_MSG("bufferStart for stream %s, track %lu, page %lu", streamName.c_str(), tid, pageNumber); VERYHIGH_MSG("bufferStart for stream %s, track %lu, page %lu", streamName.c_str(), tid, pageNumber);
//Initialize the stream metadata if it does not yet exist //Initialize the stream metadata if it does not yet exist
@ -239,15 +251,6 @@ namespace Mist {
return 0; return 0;
} }
///Buffers the next packet on the currently opened page
///\param pack The packet to buffer
void InOutBase::bufferNext(JSON::Value & pack) {
std::string packData = pack.toNetPacked();
DTSC::Packet newPack(packData.data(), packData.size());
///\note Internally calls bufferNext(DTSC::Packet & pack)
nProxy.bufferNext(newPack, myMeta);
}
///Buffers the next packet on the currently opened page ///Buffers the next packet on the currently opened page
///\param pack The packet to buffer ///\param pack The packet to buffer
void InOutBase::bufferNext(DTSC::Packet & pack) { void InOutBase::bufferNext(DTSC::Packet & pack) {
@ -384,19 +387,6 @@ namespace Mist {
curPageNum.erase(tid); curPageNum.erase(tid);
} }
///Buffers a live packet to a page.
///
///Handles both buffering and creation of new pages
///
///Initiates/continues negotiation with the buffer as well
///\param packet The packet to buffer
void InOutBase::bufferLivePacket(JSON::Value & packet) {
DTSC::Packet realPacket;
realPacket.genericFill(packet["time"].asInt(), packet["offset"].asInt(), packet["trackid"].asInt(), packet["data"].asStringRef().c_str(), packet["data"].asStringRef().size(), packet["bpos"].asInt(), packet["keyframe"].asInt());
bufferLivePacket(realPacket);
}
///Buffers a live packet to a page. ///Buffers a live packet to a page.
/// ///
///Handles both buffering and creation of new pages ///Handles both buffering and creation of new pages
@ -531,6 +521,10 @@ namespace Mist {
nProxy.continueNegotiate(tid, myMeta, quickNegotiate); nProxy.continueNegotiate(tid, myMeta, quickNegotiate);
} }
negotiationProxy::negotiationProxy(){
negTimer = 0;
}
void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate) { void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate) {
if (!tid) { if (!tid) {
return; return;
@ -639,9 +633,11 @@ namespace Mist {
INSANE_MSG("NewTid: %0.8lX", newTid); INSANE_MSG("NewTid: %0.8lX", newTid);
if (newTid == 0x80000000u) { if (newTid == 0x80000000u) {
INSANE_MSG("Breaking because not set yet"); INSANE_MSG("Breaking because not set yet");
negTimer++;
break; break;
} }
HIGH_MSG("Track %lu temporarily mapped to %lu", tid, newTid); HIGH_MSG("Track %lu temporarily mapped to %lu", tid, newTid);
negTimer = 0;
char pageName[NAME_BUFFER_SIZE]; char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), newTid); snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), newTid);
@ -669,6 +665,7 @@ namespace Mist {
unsigned long firstPage = firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5]; unsigned long firstPage = firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5];
if (firstPage == 0xFFFF) { if (firstPage == 0xFFFF) {
HIGH_MSG("Negotiating, but firstPage not yet set, waiting for buffer"); HIGH_MSG("Negotiating, but firstPage not yet set, waiting for buffer");
negTimer++;
break; break;
} }
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
@ -682,6 +679,7 @@ namespace Mist {
trackMap.erase(tid); trackMap.erase(tid);
break; break;
} }
negTimer = 0;
//Reinitialize so we can be sure we got the right values here //Reinitialize so we can be sure we got the right values here
finalTid = ((long)(tmp[offset]) << 24) | ((long)(tmp[offset + 1]) << 16) | ((long)(tmp[offset + 2]) << 8) | tmp[offset + 3]; finalTid = ((long)(tmp[offset]) << 24) | ((long)(tmp[offset + 1]) << 16) | ((long)(tmp[offset + 2]) << 8) | tmp[offset + 3];
firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5]; firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5];

View file

@ -27,7 +27,8 @@ namespace Mist {
class negotiationProxy { class negotiationProxy {
public: public:
negotiationProxy() {} negotiationProxy();
void clear();
bool bufferStart(unsigned long tid, unsigned long pageNumber, DTSC::Meta & myMeta); bool bufferStart(unsigned long tid, unsigned long pageNumber, DTSC::Meta & myMeta);
void bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta); void bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta);
void bufferFinalize(unsigned long tid, DTSC::Meta &myMeta); void bufferFinalize(unsigned long tid, DTSC::Meta &myMeta);
@ -55,6 +56,7 @@ namespace Mist {
std::string streamName;///< Name of the stream to connect to std::string streamName;///< Name of the stream to connect to
void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate = false); void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate = false);
uint32_t negTimer; ///< How long we've been negotiating, in packets.
}; };
///\brief Class containing all basic input and output functions. ///\brief Class containing all basic input and output functions.
@ -63,11 +65,9 @@ namespace Mist {
void initiateMeta(); void initiateMeta();
bool bufferStart(unsigned long tid, unsigned long pageNumber); bool bufferStart(unsigned long tid, unsigned long pageNumber);
void bufferNext(DTSC::Packet & pack); void bufferNext(DTSC::Packet & pack);
void bufferNext(JSON::Value & pack);
void bufferFinalize(unsigned long tid); void bufferFinalize(unsigned long tid);
void bufferRemove(unsigned long tid, unsigned long pageNumber); void bufferRemove(unsigned long tid, unsigned long pageNumber);
void bufferLivePacket(JSON::Value & packet); virtual void bufferLivePacket(DTSC::Packet & packet);
void bufferLivePacket(DTSC::Packet & packet);
protected: protected:
void continueNegotiate(unsigned long tid, bool quickNegotiate = false); void continueNegotiate(unsigned long tid, bool quickNegotiate = false);

View file

@ -38,7 +38,17 @@ namespace Mist{
cfg->addOption("noinput", option); cfg->addOption("noinput", option);
} }
void Output::bufferLivePacket(DTSC::Packet & packet){
if (nProxy.negTimer > 600){
WARN_MSG("No negotiation response from buffer - reconnecting.");
nProxy.clear();
reconnect();
}
InOutBase::bufferLivePacket(packet);
}
Output::Output(Socket::Connection & conn) : myConn(conn){ Output::Output(Socket::Connection & conn) : myConn(conn){
pushing = false;
firstTime = 0; firstTime = 0;
crc = getpid(); crc = getpid();
parseData = false; parseData = false;
@ -74,8 +84,8 @@ namespace Mist{
} }
void Output::updateMeta(){ void Output::updateMeta(){
//cancel if not alive //cancel if not alive or pushing a new stream
if (!nProxy.userClient.isAlive()){ if (!nProxy.userClient.isAlive() || (isPushing() && myMeta.tracks.size())){
return; return;
} }
//read metadata from page to myMeta variable //read metadata from page to myMeta variable
@ -144,6 +154,7 @@ namespace Mist{
} }
bool Output::isReadyForPlay(){ bool Output::isReadyForPlay(){
if (isPushing()){return true;}
if (myMeta.tracks.size()){ if (myMeta.tracks.size()){
if (!selectedTracks.size()){ if (!selectedTracks.size()){
selectDefaultTracks(); selectDefaultTracks();
@ -992,6 +1003,9 @@ namespace Mist{
/// Outputs used as an input should return INPUT, outputs used for automation should return OUTPUT, others should return their proper name. /// Outputs used as an input should return INPUT, outputs used for automation should return OUTPUT, others should return their proper name.
/// The default implementation is usually good enough for all the non-INPUT types. /// The default implementation is usually good enough for all the non-INPUT types.
std::string Output::getStatsName(){ std::string Output::getStatsName(){
if (isPushing()){
return "INPUT";
}
if (config->hasOption("target") && config->getString("target").size()){ if (config->hasOption("target") && config->getString("target").size()){
return "OUTPUT"; return "OUTPUT";
}else{ }else{
@ -1045,7 +1059,7 @@ namespace Mist{
myConn.close(); myConn.close();
return; return;
} }
if (!nProxy.trackMap.size()){ if (!isPushing()){
IPC::userConnection userConn(nProxy.userClient.getData()); IPC::userConnection userConn(nProxy.userClient.getData());
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){ for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){
userConn.setTrackId(tNum, *it); userConn.setTrackId(tNum, *it);
@ -1092,6 +1106,7 @@ namespace Mist{
/// Runs all appropriate triggers and checks. /// Runs all appropriate triggers and checks.
/// Returns true if the push should continue, false otherwise. /// Returns true if the push should continue, false otherwise.
bool Output::allowPush(const std::string & passwd){ bool Output::allowPush(const std::string & passwd){
pushing = true;
std::string strmSource; std::string strmSource;
// Initialize the stream source if needed, connect to it // Initialize the stream source if needed, connect to it
@ -1101,18 +1116,22 @@ namespace Mist{
if (!strmSource.size()){ if (!strmSource.size()){
FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str()); FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str());
pushing = false;
return false; return false;
} }
if (strmSource.substr(0, 7) != "push://"){ if (strmSource.substr(0, 7) != "push://"){
FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), strmSource.c_str()); FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), strmSource.c_str());
pushing = false;
return false; return false;
} }
std::string source = strmSource.substr(7); std::string source = strmSource.substr(7);
std::string IP = source.substr(0, source.find('@')); std::string IP = source.substr(0, source.find('@'));
if (IP != ""){ if (IP != ""){
if (!myConn.isAddress(IP)){ if (!myConn.isAddress(IP)){
FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str()); FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str());
pushing = false;
return false; return false;
} }
} }

View file

@ -84,8 +84,9 @@ namespace Mist {
std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes. std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes.
std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets. std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets.
bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext(). bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext().
uint64_t lastRecv;
protected://these are to be messed with by child classes protected://these are to be messed with by child classes
bool pushing;
uint64_t lastRecv;
virtual std::string getConnectedHost(); virtual std::string getConnectedHost();
virtual std::string getConnectedBinHost(); virtual std::string getConnectedBinHost();
virtual std::string getStatsName(); virtual std::string getStatsName();
@ -111,7 +112,9 @@ namespace Mist {
bool sentHeader;///< If false, triggers sendHeader if parseData is true. bool sentHeader;///< If false, triggers sendHeader if parseData is true.
std::map<int,DTSCPageData> bookKeeping; std::map<int,DTSCPageData> bookKeeping;
virtual bool isPushing(){return pushing;};
bool allowPush(const std::string & passwd); bool allowPush(const std::string & passwd);
void bufferLivePacket(DTSC::Packet & packet);
}; };
} }

View file

@ -53,9 +53,9 @@ namespace Mist {
Output::onFail(); Output::onFail();
} }
/// We assume it's ready to play if there is at least one track available /// The HTTP output is always ready to play
bool OutHTTP::isReadyForPlay() { bool OutHTTP::isReadyForPlay() {
return myMeta.tracks.size(); return true;
} }
void OutHTTP::init(Util::Config * cfg){ void OutHTTP::init(Util::Config * cfg){
@ -68,7 +68,7 @@ namespace Mist {
capa["url_match"].append("/crossdomain.xml"); capa["url_match"].append("/crossdomain.xml");
capa["url_match"].append("/clientaccesspolicy.xml"); capa["url_match"].append("/clientaccesspolicy.xml");
capa["url_match"].append("/$.html"); capa["url_match"].append("/$.html");
capa["url_match"].append("/$.ico"); capa["url_match"].append("/favicon.ico");
capa["url_match"].append("/$.smil"); capa["url_match"].append("/$.smil");
capa["url_match"].append("/info_$.js"); capa["url_match"].append("/info_$.js");
capa["url_match"].append("/json_$.js"); capa["url_match"].append("/json_$.js");

View file

@ -9,7 +9,6 @@
namespace Mist { namespace Mist {
OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) { OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) {
isPushing = false;
setBlocking(true); setBlocking(true);
while (!conn.Received().available(1537) && conn.connected() && config->is_active) { while (!conn.Received().available(1537) && conn.connected() && config->is_active) {
conn.spool(); conn.spool();
@ -36,21 +35,6 @@ namespace Mist {
minSkipAhead = 500; minSkipAhead = 500;
} }
bool OutRTMP::isReadyForPlay(){
if (isPushing){
return true;
}
return Output::isReadyForPlay();
}
std::string OutRTMP::getStatsName(){
if (isPushing){
return "INPUT";
}else{
return Output::getStatsName();
}
}
bool OutRTMP::onFinish(){ bool OutRTMP::onFinish(){
MEDIUM_MSG("Finishing stream %s, %s", streamName.c_str(), myConn?"while connected":"already disconnected"); MEDIUM_MSG("Finishing stream %s, %s", streamName.c_str(), myConn?"while connected":"already disconnected");
if (myConn){ if (myConn){
@ -587,9 +571,7 @@ namespace Mist {
Util::sanitizeName(streamName); Util::sanitizeName(streamName);
isPushing = true;
if (!allowPush("")){ if (!allowPush("")){
isPushing = false;
onFinish(); onFinish();
return; return;
} }

View file

@ -13,17 +13,14 @@ namespace Mist {
void onRequest(); void onRequest();
void sendNext(); void sendNext();
void sendHeader(); void sendHeader();
bool isReadyForPlay();
bool onFinish(); bool onFinish();
protected: protected:
uint64_t rtmpOffset; uint64_t rtmpOffset;
bool isPushing;
void parseVars(std::string data); void parseVars(std::string data);
std::string app_name; std::string app_name;
void parseChunk(Socket::Buffer & inputBuffer); void parseChunk(Socket::Buffer & inputBuffer);
void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId);
void sendCommand(AMF::Object & amfReply, int messageType, int streamId); void sendCommand(AMF::Object & amfReply, int messageType, int streamId);
std::string getStatsName();
}; };
} }