bufferLivePacket improvements for generic Outputs
This commit is contained in:
parent
2717c21d4e
commit
746d982d28
12 changed files with 61 additions and 86 deletions
|
@ -1194,13 +1194,13 @@ namespace IPC {
|
||||||
}
|
}
|
||||||
if (!hasCounter) {
|
if (!hasCounter) {
|
||||||
DEBUG_MSG(DLVL_WARN, "Trying to time-out an element without counters");
|
DEBUG_MSG(DLVL_WARN, "Trying to time-out an element without counters");
|
||||||
|
myPage.close();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (myPage.mapped) {
|
|
||||||
semGuard tmpGuard(&mySemaphore);
|
semGuard tmpGuard(&mySemaphore);
|
||||||
myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0);
|
myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0);
|
||||||
HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1));
|
HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1));
|
||||||
}
|
myPage.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
///\brief Re-initialize the counter
|
///\brief Re-initialize the counter
|
||||||
|
|
43
src/io.cpp
43
src/io.cpp
|
@ -29,6 +29,18 @@ namespace Mist {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void negotiationProxy::clear(){
|
||||||
|
pagesByTrack.clear();
|
||||||
|
trackOffset.clear();
|
||||||
|
trackState.clear();
|
||||||
|
trackMap.clear();
|
||||||
|
metaPages.clear();
|
||||||
|
curPageNum.clear();
|
||||||
|
curPage.clear();
|
||||||
|
negTimer = 0;
|
||||||
|
userClient.finish();
|
||||||
|
}
|
||||||
|
|
||||||
/*LTS-START*/
|
/*LTS-START*/
|
||||||
void negotiationProxy::initiateEncryption(){
|
void negotiationProxy::initiateEncryption(){
|
||||||
static bool encInit = false;
|
static bool encInit = false;
|
||||||
|
@ -282,15 +294,6 @@ namespace Mist {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
///Buffers the next packet on the currently opened page
|
|
||||||
///\param pack The packet to buffer
|
|
||||||
void InOutBase::bufferNext(JSON::Value & pack) {
|
|
||||||
std::string packData = pack.toNetPacked();
|
|
||||||
DTSC::Packet newPack(packData.data(), packData.size());
|
|
||||||
///\note Internally calls bufferNext(DTSC::Packet & pack)
|
|
||||||
nProxy.bufferNext(newPack, myMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
///Buffers the next packet on the currently opened page
|
///Buffers the next packet on the currently opened page
|
||||||
///\param pack The packet to buffer
|
///\param pack The packet to buffer
|
||||||
void InOutBase::bufferNext(DTSC::Packet & pack) {
|
void InOutBase::bufferNext(DTSC::Packet & pack) {
|
||||||
|
@ -449,19 +452,6 @@ namespace Mist {
|
||||||
curPageNum.erase(tid);
|
curPageNum.erase(tid);
|
||||||
}
|
}
|
||||||
|
|
||||||
///Buffers a live packet to a page.
|
|
||||||
///
|
|
||||||
///Handles both buffering and creation of new pages
|
|
||||||
///
|
|
||||||
///Initiates/continues negotiation with the buffer as well
|
|
||||||
///\param packet The packet to buffer
|
|
||||||
void InOutBase::bufferLivePacket(JSON::Value & packet) {
|
|
||||||
DTSC::Packet realPacket;
|
|
||||||
realPacket.genericFill(packet["time"].asInt(), packet["offset"].asInt(), packet["trackid"].asInt(), packet["data"].asStringRef().c_str(), packet["data"].asStringRef().size(), packet["bpos"].asInt(), packet["keyframe"].asInt());
|
|
||||||
bufferLivePacket(realPacket);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
///Buffers a live packet to a page.
|
///Buffers a live packet to a page.
|
||||||
///
|
///
|
||||||
///Handles both buffering and creation of new pages
|
///Handles both buffering and creation of new pages
|
||||||
|
@ -596,6 +586,11 @@ namespace Mist {
|
||||||
nProxy.continueNegotiate(tid, myMeta, quickNegotiate);
|
nProxy.continueNegotiate(tid, myMeta, quickNegotiate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
negotiationProxy::negotiationProxy(){
|
||||||
|
encrypt = false;
|
||||||
|
negTimer = 0;
|
||||||
|
}
|
||||||
|
|
||||||
void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate) {
|
void negotiationProxy::continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate) {
|
||||||
if (!tid) {
|
if (!tid) {
|
||||||
return;
|
return;
|
||||||
|
@ -704,9 +699,11 @@ namespace Mist {
|
||||||
INSANE_MSG("NewTid: %0.8lX", newTid);
|
INSANE_MSG("NewTid: %0.8lX", newTid);
|
||||||
if (newTid == 0x80000000u) {
|
if (newTid == 0x80000000u) {
|
||||||
INSANE_MSG("Breaking because not set yet");
|
INSANE_MSG("Breaking because not set yet");
|
||||||
|
negTimer++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
HIGH_MSG("Track %lu temporarily mapped to %lu", tid, newTid);
|
HIGH_MSG("Track %lu temporarily mapped to %lu", tid, newTid);
|
||||||
|
negTimer = 0;
|
||||||
|
|
||||||
char pageName[NAME_BUFFER_SIZE];
|
char pageName[NAME_BUFFER_SIZE];
|
||||||
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), newTid);
|
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), newTid);
|
||||||
|
@ -734,6 +731,7 @@ namespace Mist {
|
||||||
unsigned long firstPage = firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5];
|
unsigned long firstPage = firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5];
|
||||||
if (firstPage == 0xFFFF) {
|
if (firstPage == 0xFFFF) {
|
||||||
HIGH_MSG("Negotiating, but firstPage not yet set, waiting for buffer");
|
HIGH_MSG("Negotiating, but firstPage not yet set, waiting for buffer");
|
||||||
|
negTimer++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
#if defined(__CYGWIN__) || defined(_WIN32)
|
#if defined(__CYGWIN__) || defined(_WIN32)
|
||||||
|
@ -747,6 +745,7 @@ namespace Mist {
|
||||||
trackMap.erase(tid);
|
trackMap.erase(tid);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
negTimer = 0;
|
||||||
//Reinitialize so we can be sure we got the right values here
|
//Reinitialize so we can be sure we got the right values here
|
||||||
finalTid = ((long)(tmp[offset]) << 24) | ((long)(tmp[offset + 1]) << 16) | ((long)(tmp[offset + 2]) << 8) | tmp[offset + 3];
|
finalTid = ((long)(tmp[offset]) << 24) | ((long)(tmp[offset + 1]) << 16) | ((long)(tmp[offset + 2]) << 8) | tmp[offset + 3];
|
||||||
firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5];
|
firstPage = ((long)(tmp[offset + 4]) << 8) | tmp[offset + 5];
|
||||||
|
|
8
src/io.h
8
src/io.h
|
@ -28,7 +28,8 @@ namespace Mist {
|
||||||
|
|
||||||
class negotiationProxy {
|
class negotiationProxy {
|
||||||
public:
|
public:
|
||||||
negotiationProxy() : encrypt(false) {}
|
negotiationProxy();
|
||||||
|
void clear();
|
||||||
void initiateEncryption();//LTS
|
void initiateEncryption();//LTS
|
||||||
bool bufferStart(unsigned long tid, unsigned long pageNumber, DTSC::Meta & myMeta);
|
bool bufferStart(unsigned long tid, unsigned long pageNumber, DTSC::Meta & myMeta);
|
||||||
void bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta);
|
void bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta);
|
||||||
|
@ -62,6 +63,7 @@ namespace Mist {
|
||||||
IPC::sharedPage encryptionPage;
|
IPC::sharedPage encryptionPage;
|
||||||
|
|
||||||
void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate = false);
|
void continueNegotiate(unsigned long tid, DTSC::Meta & myMeta, bool quickNegotiate = false);
|
||||||
|
uint32_t negTimer; ///< How long we've been negotiating, in packets.
|
||||||
};
|
};
|
||||||
|
|
||||||
///\brief Class containing all basic input and output functions.
|
///\brief Class containing all basic input and output functions.
|
||||||
|
@ -70,11 +72,9 @@ namespace Mist {
|
||||||
void initiateMeta();
|
void initiateMeta();
|
||||||
bool bufferStart(unsigned long tid, unsigned long pageNumber);
|
bool bufferStart(unsigned long tid, unsigned long pageNumber);
|
||||||
void bufferNext(DTSC::Packet & pack);
|
void bufferNext(DTSC::Packet & pack);
|
||||||
void bufferNext(JSON::Value & pack);
|
|
||||||
void bufferFinalize(unsigned long tid);
|
void bufferFinalize(unsigned long tid);
|
||||||
void bufferRemove(unsigned long tid, unsigned long pageNumber);
|
void bufferRemove(unsigned long tid, unsigned long pageNumber);
|
||||||
void bufferLivePacket(JSON::Value & packet);
|
virtual void bufferLivePacket(DTSC::Packet & packet);
|
||||||
void bufferLivePacket(DTSC::Packet & packet);
|
|
||||||
protected:
|
protected:
|
||||||
void continueNegotiate(unsigned long tid, bool quickNegotiate = false);
|
void continueNegotiate(unsigned long tid, bool quickNegotiate = false);
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,17 @@ namespace Mist{
|
||||||
cfg->addOption("noinput", option);
|
cfg->addOption("noinput", option);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Output::bufferLivePacket(DTSC::Packet & packet){
|
||||||
|
if (nProxy.negTimer > 600){
|
||||||
|
WARN_MSG("No negotiation response from buffer - reconnecting.");
|
||||||
|
nProxy.clear();
|
||||||
|
reconnect();
|
||||||
|
}
|
||||||
|
InOutBase::bufferLivePacket(packet);
|
||||||
|
}
|
||||||
|
|
||||||
Output::Output(Socket::Connection & conn) : myConn(conn){
|
Output::Output(Socket::Connection & conn) : myConn(conn){
|
||||||
|
pushing = false;
|
||||||
firstTime = 0;
|
firstTime = 0;
|
||||||
crc = getpid();
|
crc = getpid();
|
||||||
parseData = false;
|
parseData = false;
|
||||||
|
@ -82,8 +92,8 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
void Output::updateMeta(){
|
void Output::updateMeta(){
|
||||||
//cancel if not alive
|
//cancel if not alive or pushing a new stream
|
||||||
if (!nProxy.userClient.isAlive()){
|
if (!nProxy.userClient.isAlive() || (isPushing() && myMeta.tracks.size())){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
//read metadata from page to myMeta variable
|
//read metadata from page to myMeta variable
|
||||||
|
@ -273,6 +283,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Output::isReadyForPlay(){
|
bool Output::isReadyForPlay(){
|
||||||
|
if (isPushing()){return true;}
|
||||||
if (myMeta.tracks.size()){
|
if (myMeta.tracks.size()){
|
||||||
if (!selectedTracks.size()){
|
if (!selectedTracks.size()){
|
||||||
selectDefaultTracks();
|
selectDefaultTracks();
|
||||||
|
@ -462,7 +473,7 @@ namespace Mist{
|
||||||
MEDIUM_MSG("Selected tracks: %s (%lu)", selected.str().c_str(), selectedTracks.size());
|
MEDIUM_MSG("Selected tracks: %s (%lu)", selected.str().c_str(), selectedTracks.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!selectedTracks.size() && myMeta.tracks.size()){
|
if (!selectedTracks.size() && myMeta.tracks.size() && capa["codecs"][bestSoFar].size()){
|
||||||
WARN_MSG("No tracks selected (%u total) for stream %s!", myMeta.tracks.size(), streamName.c_str());
|
WARN_MSG("No tracks selected (%u total) for stream %s!", myMeta.tracks.size(), streamName.c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1167,6 +1178,9 @@ namespace Mist{
|
||||||
/// Outputs used as an input should return INPUT, outputs used for automation should return OUTPUT, others should return their proper name.
|
/// Outputs used as an input should return INPUT, outputs used for automation should return OUTPUT, others should return their proper name.
|
||||||
/// The default implementation is usually good enough for all the non-INPUT types.
|
/// The default implementation is usually good enough for all the non-INPUT types.
|
||||||
std::string Output::getStatsName(){
|
std::string Output::getStatsName(){
|
||||||
|
if (isPushing()){
|
||||||
|
return "INPUT";
|
||||||
|
}
|
||||||
if (config->hasOption("target") && config->getString("target").size()){
|
if (config->hasOption("target") && config->getString("target").size()){
|
||||||
return "OUTPUT";
|
return "OUTPUT";
|
||||||
}else{
|
}else{
|
||||||
|
@ -1228,7 +1242,7 @@ namespace Mist{
|
||||||
myConn.close();
|
myConn.close();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!nProxy.trackMap.size()){
|
if (!isPushing()){
|
||||||
IPC::userConnection userConn(nProxy.userClient.getData());
|
IPC::userConnection userConn(nProxy.userClient.getData());
|
||||||
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){
|
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){
|
||||||
userConn.setTrackId(tNum, *it);
|
userConn.setTrackId(tNum, *it);
|
||||||
|
@ -1276,6 +1290,7 @@ namespace Mist{
|
||||||
/// Runs all appropriate triggers and checks.
|
/// Runs all appropriate triggers and checks.
|
||||||
/// Returns true if the push should continue, false otherwise.
|
/// Returns true if the push should continue, false otherwise.
|
||||||
bool Output::allowPush(const std::string & passwd){
|
bool Output::allowPush(const std::string & passwd){
|
||||||
|
pushing = true;
|
||||||
std::string strmSource;
|
std::string strmSource;
|
||||||
|
|
||||||
// Initialize the stream source if needed, connect to it
|
// Initialize the stream source if needed, connect to it
|
||||||
|
@ -1285,10 +1300,12 @@ namespace Mist{
|
||||||
|
|
||||||
if (!strmSource.size()){
|
if (!strmSource.size()){
|
||||||
FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str());
|
FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str());
|
||||||
|
pushing = false;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (strmSource.substr(0, 7) != "push://"){
|
if (strmSource.substr(0, 7) != "push://"){
|
||||||
FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), strmSource.c_str());
|
FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), strmSource.c_str());
|
||||||
|
pushing = false;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1317,6 +1334,7 @@ namespace Mist{
|
||||||
std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl;
|
std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl;
|
||||||
if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){
|
if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){
|
||||||
FAIL_MSG("Push from %s to %s rejected - STREAM_PUSH trigger denied the push", getConnectedHost().c_str(), streamName.c_str());
|
FAIL_MSG("Push from %s to %s rejected - STREAM_PUSH trigger denied the push", getConnectedHost().c_str(), streamName.c_str());
|
||||||
|
pushing = false;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1325,6 +1343,7 @@ namespace Mist{
|
||||||
if (IP != ""){
|
if (IP != ""){
|
||||||
if (!myConn.isAddress(IP)){
|
if (!myConn.isAddress(IP)){
|
||||||
FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str());
|
FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str());
|
||||||
|
pushing = false;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,7 @@ namespace Mist {
|
||||||
std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets.
|
std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets.
|
||||||
bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext().
|
bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext().
|
||||||
protected://these are to be messed with by child classes
|
protected://these are to be messed with by child classes
|
||||||
|
bool pushing;
|
||||||
uint64_t lastRecv;
|
uint64_t lastRecv;
|
||||||
long long unsigned int firstTime;///< Time of first packet after last seek. Used for real-time sending.
|
long long unsigned int firstTime;///< Time of first packet after last seek. Used for real-time sending.
|
||||||
virtual std::string getConnectedHost();
|
virtual std::string getConnectedHost();
|
||||||
|
@ -127,7 +128,9 @@ namespace Mist {
|
||||||
|
|
||||||
std::map<int,DTSCPageData> bookKeeping;
|
std::map<int,DTSCPageData> bookKeeping;
|
||||||
virtual bool isRecording(){return false;};
|
virtual bool isRecording(){return false;};
|
||||||
|
virtual bool isPushing(){return pushing;};
|
||||||
bool allowPush(const std::string & passwd);
|
bool allowPush(const std::string & passwd);
|
||||||
|
void bufferLivePacket(DTSC::Packet & packet);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,6 @@ namespace Mist {
|
||||||
Bit::htobl(sSize, prep.packedSize());
|
Bit::htobl(sSize, prep.packedSize());
|
||||||
myConn.SendNow(sSize, 4);
|
myConn.SendNow(sSize, 4);
|
||||||
prep.sendTo(myConn);
|
prep.sendTo(myConn);
|
||||||
pushing = false;
|
|
||||||
lastActive = Util::epoch();
|
lastActive = Util::epoch();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,9 +173,7 @@ namespace Mist {
|
||||||
streamName = dScan.getMember("stream").asString();
|
streamName = dScan.getMember("stream").asString();
|
||||||
std::string passString = dScan.getMember("password").asString();
|
std::string passString = dScan.getMember("password").asString();
|
||||||
Util::sanitizeName(streamName);
|
Util::sanitizeName(streamName);
|
||||||
pushing = true;
|
|
||||||
if (!allowPush(passString)){
|
if (!allowPush(passString)){
|
||||||
pushing = false;
|
|
||||||
myConn.close();
|
myConn.close();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@ namespace Mist {
|
||||||
unsigned int lastActive;///<Time of last sending of data.
|
unsigned int lastActive;///<Time of last sending of data.
|
||||||
std::string getStatsName();
|
std::string getStatsName();
|
||||||
std::string salt;
|
std::string salt;
|
||||||
bool pushing;
|
|
||||||
void handlePush(DTSC::Scan & dScan);
|
void handlePush(DTSC::Scan & dScan);
|
||||||
void handlePlay(DTSC::Scan & dScan);
|
void handlePlay(DTSC::Scan & dScan);
|
||||||
unsigned long long fastAsPossibleTime;
|
unsigned long long fastAsPossibleTime;
|
||||||
|
|
|
@ -53,9 +53,9 @@ namespace Mist {
|
||||||
Output::onFail();
|
Output::onFail();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// We assume it's ready to play if there is at least one track available
|
/// The HTTP output is always ready to play
|
||||||
bool OutHTTP::isReadyForPlay() {
|
bool OutHTTP::isReadyForPlay() {
|
||||||
return myMeta.tracks.size();
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void OutHTTP::init(Util::Config * cfg){
|
void OutHTTP::init(Util::Config * cfg){
|
||||||
|
@ -68,7 +68,7 @@ namespace Mist {
|
||||||
capa["url_match"].append("/crossdomain.xml");
|
capa["url_match"].append("/crossdomain.xml");
|
||||||
capa["url_match"].append("/clientaccesspolicy.xml");
|
capa["url_match"].append("/clientaccesspolicy.xml");
|
||||||
capa["url_match"].append("/$.html");
|
capa["url_match"].append("/$.html");
|
||||||
capa["url_match"].append("/$.ico");
|
capa["url_match"].append("/favicon.ico");
|
||||||
capa["url_match"].append("/$.smil");
|
capa["url_match"].append("/$.smil");
|
||||||
capa["url_match"].append("/info_$.js");
|
capa["url_match"].append("/info_$.js");
|
||||||
capa["url_match"].append("/json_$.js");
|
capa["url_match"].append("/json_$.js");
|
||||||
|
|
|
@ -10,7 +10,6 @@
|
||||||
|
|
||||||
namespace Mist {
|
namespace Mist {
|
||||||
OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) {
|
OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) {
|
||||||
isPushing = false;
|
|
||||||
maxbps = config->getInteger("maxkbps")*128;
|
maxbps = config->getInteger("maxkbps")*128;
|
||||||
if (config->getString("target").size() && config->getString("target").substr(0, 7) == "rtmp://"){
|
if (config->getString("target").size() && config->getString("target").substr(0, 7) == "rtmp://"){
|
||||||
streamName = config->getString("streamname");
|
streamName = config->getString("streamname");
|
||||||
|
@ -162,21 +161,6 @@ namespace Mist {
|
||||||
return !(config->getString("target").size());
|
return !(config->getString("target").size());
|
||||||
}
|
}
|
||||||
|
|
||||||
bool OutRTMP::isReadyForPlay(){
|
|
||||||
if (isPushing){
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return Output::isReadyForPlay();
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string OutRTMP::getStatsName(){
|
|
||||||
if (isPushing){
|
|
||||||
return "INPUT";
|
|
||||||
}else{
|
|
||||||
return Output::getStatsName();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool OutRTMP::onFinish(){
|
bool OutRTMP::onFinish(){
|
||||||
MEDIUM_MSG("Finishing stream %s, %s", streamName.c_str(), myConn?"while connected":"already disconnected");
|
MEDIUM_MSG("Finishing stream %s, %s", streamName.c_str(), myConn?"while connected":"already disconnected");
|
||||||
if (myConn){
|
if (myConn){
|
||||||
|
@ -778,9 +762,7 @@ namespace Mist {
|
||||||
|
|
||||||
Util::sanitizeName(streamName);
|
Util::sanitizeName(streamName);
|
||||||
|
|
||||||
isPushing = true;
|
|
||||||
if (!allowPush(app_name)){
|
if (!allowPush(app_name)){
|
||||||
isPushing = false;
|
|
||||||
onFinish();
|
onFinish();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,20 +13,17 @@ namespace Mist {
|
||||||
void onRequest();
|
void onRequest();
|
||||||
void sendNext();
|
void sendNext();
|
||||||
void sendHeader();
|
void sendHeader();
|
||||||
bool isReadyForPlay();
|
|
||||||
static bool listenMode();
|
static bool listenMode();
|
||||||
void requestHandler();
|
void requestHandler();
|
||||||
bool onFinish();
|
bool onFinish();
|
||||||
protected:
|
protected:
|
||||||
uint64_t rtmpOffset;
|
uint64_t rtmpOffset;
|
||||||
bool isPushing;
|
|
||||||
unsigned int maxbps;
|
unsigned int maxbps;
|
||||||
void parseVars(std::string data);
|
void parseVars(std::string data);
|
||||||
std::string app_name;
|
std::string app_name;
|
||||||
void parseChunk(Socket::Buffer & inputBuffer);
|
void parseChunk(Socket::Buffer & inputBuffer);
|
||||||
void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId);
|
void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId);
|
||||||
void sendCommand(AMF::Object & amfReply, int messageType, int streamId);
|
void sendCommand(AMF::Object & amfReply, int messageType, int streamId);
|
||||||
std::string getStatsName();
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ namespace Mist {
|
||||||
maxSkipAhead = 0;
|
maxSkipAhead = 0;
|
||||||
minSkipAhead = 0;
|
minSkipAhead = 0;
|
||||||
expectTCP = false;
|
expectTCP = false;
|
||||||
isPushing = false;
|
|
||||||
lastTimeSync = 0;
|
lastTimeSync = 0;
|
||||||
mainConn = &myConn;
|
mainConn = &myConn;
|
||||||
}
|
}
|
||||||
|
@ -76,13 +75,6 @@ namespace Mist {
|
||||||
config = cfg;
|
config = cfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool OutRTSP::isReadyForPlay(){
|
|
||||||
if (isPushing){
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return Output::isReadyForPlay();
|
|
||||||
}
|
|
||||||
|
|
||||||
void OutRTSP::sendNext(){
|
void OutRTSP::sendNext(){
|
||||||
char * dataPointer = 0;
|
char * dataPointer = 0;
|
||||||
unsigned int dataLen = 0;
|
unsigned int dataLen = 0;
|
||||||
|
@ -152,14 +144,6 @@ namespace Mist {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string OutRTSP::getStatsName(){
|
|
||||||
if (isPushing){
|
|
||||||
return "INPUT";
|
|
||||||
}else{
|
|
||||||
return Output::getStatsName();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This request handler also checks for UDP packets
|
/// This request handler also checks for UDP packets
|
||||||
void OutRTSP::requestHandler(){
|
void OutRTSP::requestHandler(){
|
||||||
if (!expectTCP){
|
if (!expectTCP){
|
||||||
|
@ -277,7 +261,7 @@ namespace Mist {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//might be push setup - check known control points
|
//might be push setup - check known control points
|
||||||
if (isPushing && tracks.size()){
|
if (pushing && tracks.size()){
|
||||||
bool setupHandled = false;
|
bool setupHandled = false;
|
||||||
for (std::map<int, RTPTrack>::iterator it = tracks.begin(); it != tracks.end(); ++it){
|
for (std::map<int, RTPTrack>::iterator it = tracks.begin(); it != tracks.end(); ++it){
|
||||||
if (it->second.control.size() && (HTTP_R.url.find(it->second.control) != std::string::npos || HTTP_R.GetVar("pass").find(it->second.control) != std::string::npos)){
|
if (it->second.control.size() && (HTTP_R.url.find(it->second.control) != std::string::npos || HTTP_R.GetVar("pass").find(it->second.control) != std::string::npos)){
|
||||||
|
@ -304,7 +288,7 @@ namespace Mist {
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
FAIL_MSG("Could not handle setup: pushing=%s, trackSize=%u", isPushing?"true":"false", tracks.size());
|
FAIL_MSG("Could not handle setup: pushing=%s, trackSize=%u", pushing?"true":"false", tracks.size());
|
||||||
}
|
}
|
||||||
if (HTTP_R.method == "PLAY"){
|
if (HTTP_R.method == "PLAY"){
|
||||||
initialSeek();
|
initialSeek();
|
||||||
|
@ -359,9 +343,7 @@ namespace Mist {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (HTTP_R.method == "ANNOUNCE"){
|
if (HTTP_R.method == "ANNOUNCE"){
|
||||||
isPushing = true;
|
|
||||||
if (!allowPush(HTTP_R.GetVar("pass"))){
|
if (!allowPush(HTTP_R.GetVar("pass"))){
|
||||||
isPushing = false;
|
|
||||||
onFinish();
|
onFinish();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -155,10 +155,8 @@ namespace Mist {
|
||||||
void sendNext();
|
void sendNext();
|
||||||
void onRequest();
|
void onRequest();
|
||||||
void requestHandler();
|
void requestHandler();
|
||||||
bool isReadyForPlay();
|
|
||||||
bool onFinish();
|
bool onFinish();
|
||||||
private:
|
private:
|
||||||
bool isPushing;
|
|
||||||
void parseSDP(const std::string & sdp);
|
void parseSDP(const std::string & sdp);
|
||||||
long long connectedAt;///< The timestamp the connection was made, as reference point for RTCP packets.
|
long long connectedAt;///< The timestamp the connection was made, as reference point for RTCP packets.
|
||||||
std::map<int, RTPTrack> tracks;///< List of selected tracks with RTSP-specific session data.
|
std::map<int, RTPTrack> tracks;///< List of selected tracks with RTSP-specific session data.
|
||||||
|
@ -173,7 +171,6 @@ namespace Mist {
|
||||||
void h264MultiParse(uint64_t ts, const uint64_t track, char * buffer, const uint32_t len);
|
void h264MultiParse(uint64_t ts, const uint64_t track, char * buffer, const uint32_t len);
|
||||||
void h264Packet(uint64_t ts, const uint64_t track, const char * buffer, const uint32_t len, bool isKey);
|
void h264Packet(uint64_t ts, const uint64_t track, const char * buffer, const uint32_t len, bool isKey);
|
||||||
void updateH264Init(uint64_t trackNo);
|
void updateH264Init(uint64_t trackNo);
|
||||||
std::string getStatsName();
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue