Increased maximum simultaneous tracks from 5 to 10, made this a define option. Allow multiple pushes through a single RTMP connection.

This commit is contained in:
Thulinma 2015-04-14 15:30:25 +02:00
parent 8d377602be
commit 69cf17d01d
8 changed files with 68 additions and 47 deletions

View file

@ -1,3 +1,4 @@
#pragma once
// Defines to print debug messages. // Defines to print debug messages.
#ifndef MIST_DEBUG #ifndef MIST_DEBUG
#define MIST_DEBUG 1 #define MIST_DEBUG 1
@ -77,3 +78,5 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", "
#define SEM_LIVE "MstLIVE%s" //%s stream name #define SEM_LIVE "MstLIVE%s" //%s stream name
#define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames #define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames
#define SIMUL_TRACKS 10

View file

@ -3,6 +3,7 @@
#include <set> #include <set>
#include "timing.h" #include "timing.h"
#include "defines.h"
#ifdef __CYGWIN__ #ifdef __CYGWIN__
#include <windows.h> #include <windows.h>
@ -11,7 +12,7 @@
#endif #endif
#define STAT_EX_SIZE 172 #define STAT_EX_SIZE 172
#define PLAY_EX_SIZE 32 #define PLAY_EX_SIZE 2+6*SIMUL_TRACKS
namespace IPC { namespace IPC {

View file

@ -246,12 +246,11 @@ namespace Mist {
static int nextTempId = 1001; static int nextTempId = 1001;
//Get the counter of this user //Get the counter of this user
char counter = (*(data - 1)); char counter = (*(data - 1));
//Each user can have at maximum 5 elements in their userpage. //Each user can have at maximum SIMUL_TRACKS elements in their userpage.
for (int index = 0; index < 5; index++) { for (int index = 0; index < SIMUL_TRACKS; index++) {
char * thisData = data + (index * 6); char * thisData = data + (index * 6);
//Get the track id from the current element //Get the track id from the current element
unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3]; unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3];
//Skip value 0xFFFFFFFF as this indicates a previously declined track //Skip value 0xFFFFFFFF as this indicates a previously declined track
if (value == 0xFFFFFFFF) { if (value == 0xFFFFFFFF) {
continue; continue;
@ -262,7 +261,7 @@ namespace Mist {
} }
//If the current value indicates a valid trackid, and it is pushed from this user //If the current value indicates a valid trackid, and it is pushed from this user
if (pushLocation[value] == thisData) { if (pushLocation[value] == data) {
//Check for timeouts, and erase the track if necessary //Check for timeouts, and erase the track if necessary
if (counter == 126 || counter == 127 || counter == 254 || counter == 255) { if (counter == 126 || counter == 127 || counter == 254 || counter == 255) {
pushLocation.erase(value); pushLocation.erase(value);
@ -271,7 +270,7 @@ namespace Mist {
metaPages[value].master = true; metaPages[value].master = true;
metaPages.erase(value); metaPages.erase(value);
} }
if (data[4] == 0xFF && data[5] == 0xFF && activeTracks.count(value)) { if (activeTracks.count(value)) {
activeTracks.erase(value); activeTracks.erase(value);
bufferLocations.erase(value); bufferLocations.erase(value);
} }
@ -347,6 +346,14 @@ namespace Mist {
} else { } else {
INFO_MSG("New track detected, assigned track id %d, coming from temporary track %lu of user %u", finalMap, value, id); INFO_MSG("New track detected, assigned track id %d, coming from temporary track %lu of user %u", finalMap, value, id);
} }
} else {
//Otherwise replace existing track
INFO_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id);
myMeta.tracks.erase(finalMap);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
metaPages[finalMap].master = true;
metaPages.erase(finalMap);
bufferLocations.erase(finalMap);
} }
//Register the new track as an active track. //Register the new track as an active track.
@ -354,7 +361,7 @@ namespace Mist {
//Register the time of registration as initial value for the lastUpdated field. //Register the time of registration as initial value for the lastUpdated field.
lastUpdated[finalMap] = Util::bootSecs(); lastUpdated[finalMap] = Util::bootSecs();
//Register the user thats is pushing this element //Register the user thats is pushing this element
pushLocation[finalMap] = thisData; pushLocation[finalMap] = data;
//Initialize the metadata for this track if it was not in place yet. //Initialize the metadata for this track if it was not in place yet.
if (!myMeta.tracks.count(finalMap)) { if (!myMeta.tracks.count(finalMap)) {
DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap); DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap);
@ -375,7 +382,7 @@ namespace Mist {
updateMeta(); updateMeta();
} }
//If the track is active, and this is the element responsible for pushing it //If the track is active, and this is the element responsible for pushing it
if (activeTracks.count(value) && pushLocation[value] == thisData) { if (activeTracks.count(value) && pushLocation[value] == data) {
//Open the track index page if we dont have it open yet //Open the track index page if we dont have it open yet
if (!metaPages.count(value) || !metaPages[value].mapped) { if (!metaPages.count(value) || !metaPages[value].mapped) {
char firstPage[NAME_BUFFER_SIZE]; char firstPage[NAME_BUFFER_SIZE];
@ -440,7 +447,7 @@ namespace Mist {
curPage[tNum].init(nextPageName, 20971520); curPage[tNum].init(nextPageName, 20971520);
//If the page can not be opened, stop here //If the page can not be opened, stop here
if (!curPage[tNum].mapped) { if (!curPage[tNum].mapped) {
///\todo Maybe generate a warning here? WARN_MSG("Could not open page: %s", nextPageName);
return; return;
} }
curPageNum[tNum] = pageNum; curPageNum[tNum] = pageNum;

View file

@ -407,12 +407,12 @@ namespace Mist {
return; return;
} }
if (!trackOffset.count(tid)) { if (!trackOffset.count(tid)) {
if (trackOffset.size() >= 5) { if (trackOffset.size() > SIMUL_TRACKS) {
INFO_MSG("Trackoffset too high"); INFO_MSG("Trackoffset too high");
return; return;
} }
//Find a free offset for the new track //Find a free offset for the new track
for (int i = 0; i < 5; i++) { for (int i = 0; i < SIMUL_TRACKS; i++) {
bool isFree = true; bool isFree = true;
for (std::map<unsigned long, unsigned long>::iterator it = trackOffset.begin(); it != trackOffset.end(); it++) { for (std::map<unsigned long, unsigned long>::iterator it = trackOffset.begin(); it != trackOffset.end(); it++) {
if (it->second == i) { if (it->second == i) {

View file

@ -619,7 +619,7 @@ namespace Mist {
} }
} }
if (trackMap.size()){ if (trackMap.size()){
for (std::map<unsigned long, unsigned long>::iterator it = trackMap.begin(); it != trackMap.end() && tNum < 5; it++){ for (std::map<unsigned long, unsigned long>::iterator it = trackMap.begin(); it != trackMap.end() && tNum < SIMUL_TRACKS; it++){
unsigned int tId = it->second; unsigned int tId = it->second;
char * thisData = userClient.getData() + (6 * tNum); char * thisData = userClient.getData() + (6 * tNum);
thisData[0] = ((tId >> 24) & 0xFF); thisData[0] = ((tId >> 24) & 0xFF);
@ -631,7 +631,7 @@ namespace Mist {
tNum ++; tNum ++;
} }
}else{ }else{
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < 5; it++){ for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){
unsigned int tId = *it; unsigned int tId = *it;
char * thisData = userClient.getData() + (6 * tNum); char * thisData = userClient.getData() + (6 * tNum);
thisData[0] = ((tId >> 24) & 0xFF); thisData[0] = ((tId >> 24) & 0xFF);
@ -644,8 +644,8 @@ namespace Mist {
} }
} }
userClient.keepAlive(); userClient.keepAlive();
if (tNum >= 5){ if (tNum > SIMUL_TRACKS){
DEBUG_MSG(DLVL_WARN, "Too many tracks selected, using only first 5"); DEBUG_MSG(DLVL_WARN, "Too many tracks selected, using only first %d", SIMUL_TRACKS);
} }
} }

View file

@ -91,8 +91,6 @@ namespace Mist {
bool isInitialized;///< If false, triggers initialization if parseData is true. bool isInitialized;///< If false, triggers initialization if parseData is true.
bool sentHeader;///< If false, triggers sendHeader if parseData is true. bool sentHeader;///< If false, triggers sendHeader if parseData is true.
DTSC::Meta meta_out;
std::deque<JSON::Value> preBuf;
std::map<int,DTSCPageData> bookKeeping; std::map<int,DTSCPageData> bookKeeping;
}; };

View file

@ -30,9 +30,6 @@ namespace Mist {
DEBUG_MSG(DLVL_DEVEL, "Handshake fail!"); DEBUG_MSG(DLVL_DEVEL, "Handshake fail!");
} }
setBlocking(false); setBlocking(false);
counter = 0;
sending = false;
streamReset = false;
maxSkipAhead = 1500; maxSkipAhead = 1500;
minSkipAhead = 500; minSkipAhead = 500;
} }
@ -448,6 +445,9 @@ namespace Mist {
if (amfData.getContentP(3)) { if (amfData.getContentP(3)) {
streamName = amfData.getContentP(3)->StrValue(); streamName = amfData.getContentP(3)->StrValue();
if (streamName.find('/')){
streamName = streamName.substr(0, streamName.find('/'));
}
size_t colonPos = streamName.find(':'); size_t colonPos = streamName.find(':');
if (colonPos != std::string::npos && colonPos < 6){ if (colonPos != std::string::npos && colonPos < 6){
@ -792,39 +792,40 @@ namespace Mist {
case 8: //audio data case 8: //audio data
case 9: //video data case 9: //video data
case 18: {//meta data case 18: {//meta data
pushData & p = pushes[next.cs_id];
if (!isInitialized) { if (!isInitialized) {
DEBUG_MSG(DLVL_MEDIUM, "Received useless media data\n"); DEBUG_MSG(DLVL_MEDIUM, "Received useless media data\n");
myConn.close(); myConn.close();
break; break;
} }
if (streamReset) {
//reset push data to empty, in case stream properties change
meta_out.reset();
preBuf.clear();
sending = false;
counter = 0;
streamReset = false;
}
F.ChunkLoader(next); F.ChunkLoader(next);
JSON::Value pack_out = F.toJSON(meta_out); JSON::Value pack_out = F.toJSON(p.meta);
if ( !pack_out.isNull()){ if ( !pack_out.isNull()){
//Check for backwards timestamps //Check for backwards timestamps
if (pack_out["time"].asInt() < meta_out.tracks[pack_out["trackid"].asInt()].lastms){ if (pack_out["time"].asInt() < p.meta.tracks[pack_out["trackid"].asInt()].lastms){
///Reset all internals ///Reset all internals
sending = false; p.sending = false;
counter = 0; p.counter = 0;
preBuf.clear(); p.preBuf.clear();
meta_out = DTSC::Meta(); p.meta = DTSC::Meta();
pack_out = F.toJSON(meta_out);//Reinitialize the metadata with this packet. pack_out = F.toJSON(p.meta);//Reinitialize the metadata with this packet.
///Reset negotiation with buffer ///Reset negotiation with buffer
userClient.finish(); userClient.finish();
userClient = IPC::sharedClient(streamName + "_users", PLAY_EX_SIZE, true); userClient = IPC::sharedClient(streamName + "_users", PLAY_EX_SIZE, true);
} }
if ( !sending){ pack_out["trackid"] = pack_out["trackid"].asInt() + next.cs_id * 3;
counter++; if ( !p.sending){
if (counter > 8){ p.counter++;
sending = true; if (p.counter > 8){
myMeta = meta_out; p.sending = true;
if (myMeta.tracks.count(1)){
myMeta = DTSC::Meta();
}
for (unsigned int i = 1; i < 4; ++i){
if (p.meta.tracks.count(i)){
myMeta.tracks[next.cs_id*3+i] = p.meta.tracks[i];
}
}
if (!userClient.getData()){ if (!userClient.getData()){
char userPageName[NAME_BUFFER_SIZE]; char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
@ -834,14 +835,13 @@ namespace Mist {
DEBUG_MSG(DLVL_MEDIUM, "Starting negotiation for track %d", it->first); DEBUG_MSG(DLVL_MEDIUM, "Starting negotiation for track %d", it->first);
continueNegotiate(it->first); continueNegotiate(it->first);
} }
//negotiatePushTracks(); for (std::deque<JSON::Value>::iterator it = p.preBuf.begin(); it != p.preBuf.end(); it++){
for (std::deque<JSON::Value>::iterator it = preBuf.begin(); it != preBuf.end(); it++){
bufferLivePacket((*it)); bufferLivePacket((*it));
} }
preBuf.clear(); //clear buffer p.preBuf.clear(); //clear buffer
bufferLivePacket(pack_out); bufferLivePacket(pack_out);
}else{ }else{
preBuf.push_back(pack_out); p.preBuf.push_back(pack_out);
} }
}else{ }else{
bufferLivePacket(pack_out); bufferLivePacket(pack_out);

View file

@ -5,6 +5,20 @@
namespace Mist { namespace Mist {
class pushData {
public:
DTSC::Meta meta;
bool sending;
int counter;
std::deque<JSON::Value> preBuf;
pushData(){
sending = false;
counter = 0;
}
};
class OutRTMP : public Output { class OutRTMP : public Output {
public: public:
OutRTMP(Socket::Connection & conn); OutRTMP(Socket::Connection & conn);
@ -16,12 +30,10 @@ namespace Mist {
protected: protected:
void parseVars(std::string data); void parseVars(std::string data);
std::string app_name; std::string app_name;
bool sending;
int counter;
bool streamReset;
void parseChunk(Socket::Buffer & inputBuffer); void parseChunk(Socket::Buffer & inputBuffer);
void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId);
void sendCommand(AMF::Object & amfReply, int messageType, int streamId); void sendCommand(AMF::Object & amfReply, int messageType, int streamId);
std::map<unsigned int, pushData> pushes;
}; };
} }