From 69cf17d01d04b37d485e17e94b026b51ec85bb31 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 14 Apr 2015 15:30:25 +0200 Subject: [PATCH] Increased maximum simultaneous tracks from 5 to 10, made this a define option. Allow multiple pushes through a single RTMP connection. --- lib/defines.h | 3 +++ lib/shared_memory.h | 3 ++- src/input/input_buffer.cpp | 23 ++++++++++------ src/io.cpp | 4 +-- src/output/output.cpp | 8 +++--- src/output/output.h | 2 -- src/output/output_rtmp.cpp | 54 +++++++++++++++++++------------------- src/output/output_rtmp.h | 18 ++++++++++--- 8 files changed, 68 insertions(+), 47 deletions(-) diff --git a/lib/defines.h b/lib/defines.h index 0fcca3ec..ab4f9a77 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -1,3 +1,4 @@ +#pragma once // Defines to print debug messages. #ifndef MIST_DEBUG #define MIST_DEBUG 1 @@ -77,3 +78,5 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #define SEM_LIVE "MstLIVE%s" //%s stream name #define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames +#define SIMUL_TRACKS 10 + diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 4567927e..7ac2d3df 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -3,6 +3,7 @@ #include #include "timing.h" +#include "defines.h" #ifdef __CYGWIN__ #include @@ -11,7 +12,7 @@ #endif #define STAT_EX_SIZE 172 -#define PLAY_EX_SIZE 32 +#define PLAY_EX_SIZE 2+6*SIMUL_TRACKS namespace IPC { diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 72ae71d9..a97d3f36 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -246,12 +246,11 @@ namespace Mist { static int nextTempId = 1001; //Get the counter of this user char counter = (*(data - 1)); - //Each user can have at maximum 5 elements in their userpage. - for (int index = 0; index < 5; index++) { + //Each user can have at maximum SIMUL_TRACKS elements in their userpage. + for (int index = 0; index < SIMUL_TRACKS; index++) { char * thisData = data + (index * 6); //Get the track id from the current element unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3]; - //Skip value 0xFFFFFFFF as this indicates a previously declined track if (value == 0xFFFFFFFF) { continue; @@ -262,7 +261,7 @@ namespace Mist { } //If the current value indicates a valid trackid, and it is pushed from this user - if (pushLocation[value] == thisData) { + if (pushLocation[value] == data) { //Check for timeouts, and erase the track if necessary if (counter == 126 || counter == 127 || counter == 254 || counter == 255) { pushLocation.erase(value); @@ -271,7 +270,7 @@ namespace Mist { metaPages[value].master = true; metaPages.erase(value); } - if (data[4] == 0xFF && data[5] == 0xFF && activeTracks.count(value)) { + if (activeTracks.count(value)) { activeTracks.erase(value); bufferLocations.erase(value); } @@ -347,6 +346,14 @@ namespace Mist { } else { INFO_MSG("New track detected, assigned track id %d, coming from temporary track %lu of user %u", finalMap, value, id); } + } else { + //Otherwise replace existing track + INFO_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id); + myMeta.tracks.erase(finalMap); + //Set master to true before erasing the page, because we are responsible for cleaning up unused pages + metaPages[finalMap].master = true; + metaPages.erase(finalMap); + bufferLocations.erase(finalMap); } //Register the new track as an active track. @@ -354,7 +361,7 @@ namespace Mist { //Register the time of registration as initial value for the lastUpdated field. lastUpdated[finalMap] = Util::bootSecs(); //Register the user thats is pushing this element - pushLocation[finalMap] = thisData; + pushLocation[finalMap] = data; //Initialize the metadata for this track if it was not in place yet. if (!myMeta.tracks.count(finalMap)) { DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap); @@ -375,7 +382,7 @@ namespace Mist { updateMeta(); } //If the track is active, and this is the element responsible for pushing it - if (activeTracks.count(value) && pushLocation[value] == thisData) { + if (activeTracks.count(value) && pushLocation[value] == data) { //Open the track index page if we dont have it open yet if (!metaPages.count(value) || !metaPages[value].mapped) { char firstPage[NAME_BUFFER_SIZE]; @@ -440,7 +447,7 @@ namespace Mist { curPage[tNum].init(nextPageName, 20971520); //If the page can not be opened, stop here if (!curPage[tNum].mapped) { - ///\todo Maybe generate a warning here? + WARN_MSG("Could not open page: %s", nextPageName); return; } curPageNum[tNum] = pageNum; diff --git a/src/io.cpp b/src/io.cpp index 068eacce..c55948e2 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -407,12 +407,12 @@ namespace Mist { return; } if (!trackOffset.count(tid)) { - if (trackOffset.size() >= 5) { + if (trackOffset.size() > SIMUL_TRACKS) { INFO_MSG("Trackoffset too high"); return; } //Find a free offset for the new track - for (int i = 0; i < 5; i++) { + for (int i = 0; i < SIMUL_TRACKS; i++) { bool isFree = true; for (std::map::iterator it = trackOffset.begin(); it != trackOffset.end(); it++) { if (it->second == i) { diff --git a/src/output/output.cpp b/src/output/output.cpp index a45a2733..83860eca 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -619,7 +619,7 @@ namespace Mist { } } if (trackMap.size()){ - for (std::map::iterator it = trackMap.begin(); it != trackMap.end() && tNum < 5; it++){ + for (std::map::iterator it = trackMap.begin(); it != trackMap.end() && tNum < SIMUL_TRACKS; it++){ unsigned int tId = it->second; char * thisData = userClient.getData() + (6 * tNum); thisData[0] = ((tId >> 24) & 0xFF); @@ -631,7 +631,7 @@ namespace Mist { tNum ++; } }else{ - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < 5; it++){ + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){ unsigned int tId = *it; char * thisData = userClient.getData() + (6 * tNum); thisData[0] = ((tId >> 24) & 0xFF); @@ -644,8 +644,8 @@ namespace Mist { } } userClient.keepAlive(); - if (tNum >= 5){ - DEBUG_MSG(DLVL_WARN, "Too many tracks selected, using only first 5"); + if (tNum > SIMUL_TRACKS){ + DEBUG_MSG(DLVL_WARN, "Too many tracks selected, using only first %d", SIMUL_TRACKS); } } diff --git a/src/output/output.h b/src/output/output.h index 922a0c9a..91e73cb1 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -91,8 +91,6 @@ namespace Mist { bool isInitialized;///< If false, triggers initialization if parseData is true. bool sentHeader;///< If false, triggers sendHeader if parseData is true. - DTSC::Meta meta_out; - std::deque preBuf; std::map bookKeeping; }; diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 239ca72e..c73e264c 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -30,9 +30,6 @@ namespace Mist { DEBUG_MSG(DLVL_DEVEL, "Handshake fail!"); } setBlocking(false); - counter = 0; - sending = false; - streamReset = false; maxSkipAhead = 1500; minSkipAhead = 500; } @@ -448,6 +445,9 @@ namespace Mist { if (amfData.getContentP(3)) { streamName = amfData.getContentP(3)->StrValue(); + if (streamName.find('/')){ + streamName = streamName.substr(0, streamName.find('/')); + } size_t colonPos = streamName.find(':'); if (colonPos != std::string::npos && colonPos < 6){ @@ -792,39 +792,40 @@ namespace Mist { case 8: //audio data case 9: //video data case 18: {//meta data + pushData & p = pushes[next.cs_id]; if (!isInitialized) { DEBUG_MSG(DLVL_MEDIUM, "Received useless media data\n"); myConn.close(); break; } - if (streamReset) { - //reset push data to empty, in case stream properties change - meta_out.reset(); - preBuf.clear(); - sending = false; - counter = 0; - streamReset = false; - } F.ChunkLoader(next); - JSON::Value pack_out = F.toJSON(meta_out); + JSON::Value pack_out = F.toJSON(p.meta); if ( !pack_out.isNull()){ //Check for backwards timestamps - if (pack_out["time"].asInt() < meta_out.tracks[pack_out["trackid"].asInt()].lastms){ + if (pack_out["time"].asInt() < p.meta.tracks[pack_out["trackid"].asInt()].lastms){ ///Reset all internals - sending = false; - counter = 0; - preBuf.clear(); - meta_out = DTSC::Meta(); - pack_out = F.toJSON(meta_out);//Reinitialize the metadata with this packet. + p.sending = false; + p.counter = 0; + p.preBuf.clear(); + p.meta = DTSC::Meta(); + pack_out = F.toJSON(p.meta);//Reinitialize the metadata with this packet. ///Reset negotiation with buffer userClient.finish(); userClient = IPC::sharedClient(streamName + "_users", PLAY_EX_SIZE, true); } - if ( !sending){ - counter++; - if (counter > 8){ - sending = true; - myMeta = meta_out; + pack_out["trackid"] = pack_out["trackid"].asInt() + next.cs_id * 3; + if ( !p.sending){ + p.counter++; + if (p.counter > 8){ + p.sending = true; + if (myMeta.tracks.count(1)){ + myMeta = DTSC::Meta(); + } + for (unsigned int i = 1; i < 4; ++i){ + if (p.meta.tracks.count(i)){ + myMeta.tracks[next.cs_id*3+i] = p.meta.tracks[i]; + } + } if (!userClient.getData()){ char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); @@ -834,14 +835,13 @@ namespace Mist { DEBUG_MSG(DLVL_MEDIUM, "Starting negotiation for track %d", it->first); continueNegotiate(it->first); } - //negotiatePushTracks(); - for (std::deque::iterator it = preBuf.begin(); it != preBuf.end(); it++){ + for (std::deque::iterator it = p.preBuf.begin(); it != p.preBuf.end(); it++){ bufferLivePacket((*it)); } - preBuf.clear(); //clear buffer + p.preBuf.clear(); //clear buffer bufferLivePacket(pack_out); }else{ - preBuf.push_back(pack_out); + p.preBuf.push_back(pack_out); } }else{ bufferLivePacket(pack_out); diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index 35a6831b..110d1984 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -5,6 +5,20 @@ namespace Mist { + + class pushData { + public: + DTSC::Meta meta; + bool sending; + int counter; + std::deque preBuf; + pushData(){ + sending = false; + counter = 0; + } + }; + + class OutRTMP : public Output { public: OutRTMP(Socket::Connection & conn); @@ -16,12 +30,10 @@ namespace Mist { protected: void parseVars(std::string data); std::string app_name; - bool sending; - int counter; - bool streamReset; void parseChunk(Socket::Buffer & inputBuffer); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId); void sendCommand(AMF::Object & amfReply, int messageType, int streamId); + std::map pushes; }; }