Updated generic HTTP output to support websockets. Added basic websocket JSON push support.
This commit is contained in:
parent
2a028fa309
commit
921e6827dc
4 changed files with 156 additions and 35 deletions
|
@ -8,12 +8,22 @@
|
|||
|
||||
namespace Mist {
|
||||
HTTPOutput::HTTPOutput(Socket::Connection & conn) : Output(conn) {
|
||||
webSock = 0;
|
||||
idleInterval = 0;
|
||||
idleLast = 0;
|
||||
if (config->getString("ip").size()){
|
||||
myConn.setHost(config->getString("ip"));
|
||||
}
|
||||
config->activate();
|
||||
}
|
||||
|
||||
HTTPOutput::~HTTPOutput() {
|
||||
if (webSock){
|
||||
delete webSock;
|
||||
webSock = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void HTTPOutput::init(Util::Config * cfg){
|
||||
Output::init(cfg);
|
||||
capa["deps"] = "HTTP";
|
||||
|
@ -152,6 +162,21 @@ namespace Mist {
|
|||
}
|
||||
|
||||
void HTTPOutput::requestHandler(){
|
||||
if (idleInterval && (Util::bootMS() > idleLast + idleInterval)){
|
||||
onIdle();
|
||||
idleLast = Util::bootMS();
|
||||
}
|
||||
if (webSock){
|
||||
if (webSock->readFrame()){
|
||||
onWebsocketFrame();
|
||||
idleLast = Util::bootMS();
|
||||
}else{
|
||||
if (!isBlocking && !parseData){
|
||||
Util::sleep(100);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (myConn.Received().size() && myConn.spool()){
|
||||
DEBUG_MSG(DLVL_DONTEVEN, "onRequest");
|
||||
onRequest();
|
||||
|
@ -199,12 +224,14 @@ namespace Mist {
|
|||
onRequest();
|
||||
}
|
||||
if (!myConn.Received().size()){
|
||||
Util::sleep(500);
|
||||
if (!isBlocking && !parseData){
|
||||
Util::sleep(100);
|
||||
}
|
||||
}
|
||||
}
|
||||
}else{
|
||||
if (!isBlocking && !parseData){
|
||||
Util::sleep(500);
|
||||
Util::sleep(100);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -232,8 +259,23 @@ namespace Mist {
|
|||
if (H.GetVar("audio") != ""){targetParams["audio"] = H.GetVar("audio");}
|
||||
if (H.GetVar("video") != ""){targetParams["video"] = H.GetVar("video");}
|
||||
if (H.GetVar("subtitle") != ""){targetParams["subtitle"] = H.GetVar("subtitle");}
|
||||
//Handle upgrade to websocket if the output supports it
|
||||
if (doesWebsockets() && H.GetHeader("Upgrade") == "websocket"){
|
||||
INFO_MSG("Switching to Websocket mode");
|
||||
preWebsocketConnect();
|
||||
webSock = new HTTP::Websocket(myConn, H);
|
||||
if (!(*webSock)){
|
||||
delete webSock;
|
||||
webSock = 0;
|
||||
return;
|
||||
}
|
||||
onWebsocketConnect();
|
||||
H.Clean();
|
||||
return;
|
||||
}
|
||||
preHTTP();
|
||||
onHTTP();
|
||||
idleLast = Util::bootMS();
|
||||
if (!H.bufferChunks){
|
||||
H.Clean();
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#pragma once
|
||||
#include <mist/defines.h>
|
||||
#include <mist/http_parser.h>
|
||||
#include <mist/websocket.h>
|
||||
#include "output.h"
|
||||
|
||||
namespace Mist {
|
||||
|
@ -8,19 +9,27 @@ namespace Mist {
|
|||
class HTTPOutput : public Output {
|
||||
public:
|
||||
HTTPOutput(Socket::Connection & conn);
|
||||
virtual ~HTTPOutput(){};
|
||||
virtual ~HTTPOutput();
|
||||
static void init(Util::Config * cfg);
|
||||
void onRequest();
|
||||
virtual void onFail();
|
||||
virtual void onHTTP(){};
|
||||
virtual void onIdle(){};
|
||||
virtual void onWebsocketFrame(){};
|
||||
virtual void onWebsocketConnect(){};
|
||||
virtual void preWebsocketConnect(){};
|
||||
virtual void requestHandler();
|
||||
virtual void preHTTP();
|
||||
static bool listenMode(){return false;}
|
||||
virtual bool doesWebsockets(){return false;}
|
||||
void reConnector(std::string & connector);
|
||||
std::string getHandler();
|
||||
bool parseRange(uint64_t & byteStart, uint64_t & byteEnd);
|
||||
protected:
|
||||
HTTP::Parser H;
|
||||
HTTP::Websocket * webSock;
|
||||
uint32_t idleInterval;
|
||||
uint64_t idleLast;
|
||||
std::string getConnectedHost();//LTS
|
||||
std::string getConnectedBinHost();//LTS
|
||||
bool isTrustedProxy(const std::string & ip);//LTS
|
||||
|
|
|
@ -4,16 +4,11 @@
|
|||
|
||||
namespace Mist {
|
||||
OutJSON::OutJSON(Socket::Connection & conn) : HTTPOutput(conn){
|
||||
ws = 0;
|
||||
realTime = 0;
|
||||
bootMsOffset = 0;
|
||||
keepReselecting = false;
|
||||
dupcheck = false;
|
||||
}
|
||||
OutJSON::~OutJSON() {
|
||||
if (ws){
|
||||
delete ws;
|
||||
ws = 0;
|
||||
}
|
||||
noReceive = false;
|
||||
}
|
||||
|
||||
void OutJSON::init(Util::Config * cfg){
|
||||
|
@ -40,8 +35,8 @@ namespace Mist {
|
|||
}
|
||||
lastVal = jPack;
|
||||
}
|
||||
if (ws){
|
||||
ws->sendFrame(jPack.toString());
|
||||
if (webSock){
|
||||
webSock->sendFrame(jPack.toString());
|
||||
return;
|
||||
}
|
||||
if (!jsonp.size()){
|
||||
|
@ -83,7 +78,7 @@ namespace Mist {
|
|||
static bool recursive = false;
|
||||
if (recursive){return true;}
|
||||
recursive = true;
|
||||
if (keepReselecting){
|
||||
if (keepReselecting && !isPushing()){
|
||||
uint64_t maxTimer = 7200;
|
||||
while (--maxTimer && nProxy.userClient.isAlive() && keepGoing()){
|
||||
Util::wait(500);
|
||||
|
@ -99,18 +94,23 @@ namespace Mist {
|
|||
}
|
||||
}
|
||||
recursive = false;
|
||||
return false;
|
||||
}
|
||||
if (!jsonp.size() && !first){
|
||||
myConn.SendNow("]);\n\n", 5);
|
||||
myConn.SendNow("]\n", 2);
|
||||
}
|
||||
myConn.close();
|
||||
return false;
|
||||
}
|
||||
|
||||
void OutJSON::onHTTP(){
|
||||
std::string method = H.method;
|
||||
jsonp = "";
|
||||
void OutJSON::onWebsocketConnect(){
|
||||
sentHeader = true;
|
||||
parseData = !noReceive;
|
||||
}
|
||||
|
||||
void OutJSON::preWebsocketConnect(){
|
||||
if (H.GetVar("password") != ""){pushPass = H.GetVar("password");}
|
||||
if (H.GetVar("password").size() || H.GetVar("push").size()){noReceive = true;}
|
||||
|
||||
if (H.GetVar("persist") != ""){keepReselecting = true;}
|
||||
if (H.GetVar("dedupe") != ""){
|
||||
dupcheck = true;
|
||||
|
@ -126,21 +126,83 @@ namespace Mist {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (H.GetVar("callback") != ""){jsonp = H.GetVar("callback");}
|
||||
if (H.GetVar("jsonp") != ""){jsonp = H.GetVar("jsonp");}
|
||||
}
|
||||
|
||||
if (H.GetHeader("Upgrade") == "websocket"){
|
||||
ws = new HTTP::Websocket(myConn, H);
|
||||
if (!(*ws)){
|
||||
delete ws;
|
||||
ws = 0;
|
||||
void OutJSON::onWebsocketFrame(){
|
||||
if (!isPushing()){
|
||||
if (!allowPush(pushPass)){
|
||||
onFinish();
|
||||
return;
|
||||
}
|
||||
sentHeader = true;
|
||||
parseData = true;
|
||||
wantRequest = false;
|
||||
}
|
||||
if (!bootMsOffset){
|
||||
if (myMeta.bootMsOffset){
|
||||
bootMsOffset = myMeta.bootMsOffset;
|
||||
}else{
|
||||
bootMsOffset = Util::bootMS();
|
||||
}
|
||||
}
|
||||
//We now know we're allowed to push. Read a JSON object.
|
||||
JSON::Value inJSON = JSON::fromString(webSock->data, webSock->data.size());
|
||||
if (!inJSON || !inJSON.isObject()){
|
||||
//Ignore empty and/or non-parsable JSON packets
|
||||
MEDIUM_MSG("Ignoring non-JSON object: %s", webSock->data);
|
||||
return;
|
||||
}
|
||||
//Let's create a new track for pushing purposes, if needed
|
||||
if (!pushTrack){
|
||||
pushTrack = 1;
|
||||
while (myMeta.tracks.count(pushTrack)){
|
||||
++pushTrack;
|
||||
}
|
||||
}
|
||||
myMeta.tracks[pushTrack].type = "meta";
|
||||
myMeta.tracks[pushTrack].codec = "JSON";
|
||||
//We have a track set correctly. Let's attempt to buffer a frame.
|
||||
inJSON["trackid"] = (long long)pushTrack;
|
||||
inJSON["datatype"] = "meta";
|
||||
lastSendTime = Util::bootMS();
|
||||
if (!inJSON.isMember("unix")){
|
||||
//Base timestamp on arrival time
|
||||
inJSON["time"] = (long long)(lastSendTime - bootMsOffset);
|
||||
}else{
|
||||
//Base timestamp on unix time
|
||||
inJSON["time"] = (long long)((lastSendTime - bootMsOffset) + (Util::epoch() - Util::bootSecs()) * 1000);
|
||||
}
|
||||
inJSON["bmo"] = (long long)bootMsOffset;
|
||||
lastVal = inJSON;
|
||||
std::string packedJson = inJSON.toNetPacked();
|
||||
DTSC::Packet newPack(packedJson.data(), packedJson.size(), true);
|
||||
bufferLivePacket(newPack);
|
||||
if (!idleInterval){idleInterval = 100;}
|
||||
if (isBlocking){setBlocking(false);}
|
||||
}
|
||||
|
||||
/// Repeats last JSON packet every 5 seconds to keep stream alive.
|
||||
void OutJSON::onIdle(){
|
||||
if (nProxy.trackState[pushTrack] != FILL_ACC){
|
||||
continueNegotiate(pushTrack);
|
||||
if (nProxy.trackState[pushTrack] == FILL_ACC){
|
||||
idleInterval = 5000;
|
||||
}
|
||||
return;
|
||||
}
|
||||
lastVal["time"] = (long long)(lastVal["time"].asInt() + (Util::bootMS() - lastSendTime));
|
||||
lastSendTime = Util::bootMS();
|
||||
lastVal.netPrepare();
|
||||
std::string packedJson = lastVal.toNetPacked();
|
||||
DTSC::Packet newPack(packedJson.data(), packedJson.size(), true);
|
||||
myMeta.tracks[pushTrack].type = "meta";
|
||||
myMeta.tracks[pushTrack].codec = "JSON";
|
||||
bufferLivePacket(newPack);
|
||||
}
|
||||
|
||||
void OutJSON::onHTTP(){
|
||||
std::string method = H.method;
|
||||
preWebsocketConnect();//Not actually a websocket, but we need to do the same checks
|
||||
jsonp = "";
|
||||
if (H.GetVar("callback") != ""){jsonp = H.GetVar("callback");}
|
||||
if (H.GetVar("jsonp") != ""){jsonp = H.GetVar("jsonp");}
|
||||
|
||||
H.Clean();
|
||||
H.setCORSHeaders();
|
||||
|
|
|
@ -5,21 +5,29 @@ namespace Mist {
|
|||
class OutJSON : public HTTPOutput {
|
||||
public:
|
||||
OutJSON(Socket::Connection & conn);
|
||||
~OutJSON();
|
||||
static void init(Util::Config * cfg);
|
||||
void onHTTP();
|
||||
void onIdle();
|
||||
virtual void onWebsocketFrame();
|
||||
virtual void onWebsocketConnect();
|
||||
virtual void preWebsocketConnect();
|
||||
bool onFinish();
|
||||
void onFail();
|
||||
void sendNext();
|
||||
void sendHeader();
|
||||
bool doesWebsockets(){return true;}
|
||||
protected:
|
||||
JSON::Value lastVal;
|
||||
uint64_t lastSendTime;
|
||||
bool keepReselecting;
|
||||
std::string jsonp;
|
||||
std::string pushPass;
|
||||
uint64_t pushTrack;
|
||||
int64_t bootMsOffset;
|
||||
bool dupcheck;
|
||||
std::set<std::string> nodup;
|
||||
bool first;
|
||||
HTTP::Websocket * ws;
|
||||
bool noReceive;
|
||||
};
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue