Updated generic HTTP output to support websockets. Added basic websocket JSON push support.

This commit is contained in:
Thulinma 2018-07-04 11:36:57 +02:00
parent 43934cf69c
commit 9104d68a5d
4 changed files with 156 additions and 35 deletions

View file

@ -6,6 +6,9 @@
namespace Mist { namespace Mist {
HTTPOutput::HTTPOutput(Socket::Connection & conn) : Output(conn) { HTTPOutput::HTTPOutput(Socket::Connection & conn) : Output(conn) {
webSock = 0;
idleInterval = 0;
idleLast = 0;
if (config->getString("ip").size()){ if (config->getString("ip").size()){
myConn.setHost(config->getString("ip")); myConn.setHost(config->getString("ip"));
} }
@ -15,6 +18,13 @@ namespace Mist {
config->activate(); config->activate();
} }
HTTPOutput::~HTTPOutput() {
if (webSock){
delete webSock;
webSock = 0;
}
}
void HTTPOutput::init(Util::Config * cfg){ void HTTPOutput::init(Util::Config * cfg){
Output::init(cfg); Output::init(cfg);
capa["deps"] = "HTTP"; capa["deps"] = "HTTP";
@ -153,6 +163,21 @@ namespace Mist {
} }
void HTTPOutput::requestHandler(){ void HTTPOutput::requestHandler(){
if (idleInterval && (Util::bootMS() > idleLast + idleInterval)){
onIdle();
idleLast = Util::bootMS();
}
if (webSock){
if (webSock->readFrame()){
onWebsocketFrame();
idleLast = Util::bootMS();
}else{
if (!isBlocking && !parseData){
Util::sleep(100);
}
}
return;
}
if (myConn.Received().size() && myConn.spool()){ if (myConn.Received().size() && myConn.spool()){
DEBUG_MSG(DLVL_DONTEVEN, "onRequest"); DEBUG_MSG(DLVL_DONTEVEN, "onRequest");
onRequest(); onRequest();
@ -197,12 +222,14 @@ namespace Mist {
onRequest(); onRequest();
} }
if (!myConn.Received().size()){ if (!myConn.Received().size()){
Util::sleep(500); if (!isBlocking && !parseData){
Util::sleep(100);
}
} }
} }
}else{ }else{
if (!isBlocking && !parseData){ if (!isBlocking && !parseData){
Util::sleep(500); Util::sleep(100);
} }
} }
} }
@ -224,8 +251,23 @@ namespace Mist {
} }
INFO_MSG("Received request %s", H.getUrl().c_str()); INFO_MSG("Received request %s", H.getUrl().c_str());
//Handle upgrade to websocket if the output supports it
if (doesWebsockets() && H.GetHeader("Upgrade") == "websocket"){
INFO_MSG("Switching to Websocket mode");
preWebsocketConnect();
webSock = new HTTP::Websocket(myConn, H);
if (!(*webSock)){
delete webSock;
webSock = 0;
return;
}
onWebsocketConnect();
H.Clean();
return;
}
preHTTP(); preHTTP();
onHTTP(); onHTTP();
idleLast = Util::bootMS();
if (!H.bufferChunks){ if (!H.bufferChunks){
H.Clean(); H.Clean();
} }

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <mist/defines.h> #include <mist/defines.h>
#include <mist/http_parser.h> #include <mist/http_parser.h>
#include <mist/websocket.h>
#include "output.h" #include "output.h"
namespace Mist { namespace Mist {
@ -8,18 +9,26 @@ namespace Mist {
class HTTPOutput : public Output { class HTTPOutput : public Output {
public: public:
HTTPOutput(Socket::Connection & conn); HTTPOutput(Socket::Connection & conn);
virtual ~HTTPOutput(){}; virtual ~HTTPOutput();
static void init(Util::Config * cfg); static void init(Util::Config * cfg);
void onRequest(); void onRequest();
virtual void onFail(); virtual void onFail();
virtual void onHTTP(){}; virtual void onHTTP(){};
virtual void onIdle(){};
virtual void onWebsocketFrame(){};
virtual void onWebsocketConnect(){};
virtual void preWebsocketConnect(){};
virtual void requestHandler(); virtual void requestHandler();
virtual void preHTTP(); virtual void preHTTP();
static bool listenMode(){return false;} static bool listenMode(){return false;}
virtual bool doesWebsockets(){return false;}
void reConnector(std::string & connector); void reConnector(std::string & connector);
std::string getHandler(); std::string getHandler();
bool parseRange(uint64_t & byteStart, uint64_t & byteEnd); bool parseRange(uint64_t & byteStart, uint64_t & byteEnd);
protected: protected:
HTTP::Parser H; HTTP::Parser H;
HTTP::Websocket * webSock;
uint32_t idleInterval;
uint64_t idleLast;
}; };
} }

View file

@ -4,16 +4,11 @@
namespace Mist { namespace Mist {
OutJSON::OutJSON(Socket::Connection & conn) : HTTPOutput(conn){ OutJSON::OutJSON(Socket::Connection & conn) : HTTPOutput(conn){
ws = 0;
realTime = 0; realTime = 0;
bootMsOffset = 0;
keepReselecting = false; keepReselecting = false;
dupcheck = false; dupcheck = false;
} noReceive = false;
OutJSON::~OutJSON() {
if (ws){
delete ws;
ws = 0;
}
} }
void OutJSON::init(Util::Config * cfg){ void OutJSON::init(Util::Config * cfg){
@ -40,8 +35,8 @@ namespace Mist {
} }
lastVal = jPack; lastVal = jPack;
} }
if (ws){ if (webSock){
ws->sendFrame(jPack.toString()); webSock->sendFrame(jPack.toString());
return; return;
} }
if (!jsonp.size()){ if (!jsonp.size()){
@ -83,7 +78,7 @@ namespace Mist {
static bool recursive = false; static bool recursive = false;
if (recursive){return true;} if (recursive){return true;}
recursive = true; recursive = true;
if (keepReselecting){ if (keepReselecting && !isPushing()){
uint64_t maxTimer = 7200; uint64_t maxTimer = 7200;
while (--maxTimer && nProxy.userClient.isAlive() && keepGoing()){ while (--maxTimer && nProxy.userClient.isAlive() && keepGoing()){
Util::wait(500); Util::wait(500);
@ -99,18 +94,23 @@ namespace Mist {
} }
} }
recursive = false; recursive = false;
return false;
} }
if (!jsonp.size() && !first){ if (!jsonp.size() && !first){
myConn.SendNow("]);\n\n", 5); myConn.SendNow("]\n", 2);
} }
myConn.close(); myConn.close();
return false; return false;
} }
void OutJSON::onHTTP(){ void OutJSON::onWebsocketConnect(){
std::string method = H.method; sentHeader = true;
jsonp = ""; parseData = !noReceive;
}
void OutJSON::preWebsocketConnect(){
if (H.GetVar("password") != ""){pushPass = H.GetVar("password");}
if (H.GetVar("password").size() || H.GetVar("push").size()){noReceive = true;}
if (H.GetVar("persist") != ""){keepReselecting = true;} if (H.GetVar("persist") != ""){keepReselecting = true;}
if (H.GetVar("dedupe") != ""){ if (H.GetVar("dedupe") != ""){
dupcheck = true; dupcheck = true;
@ -126,22 +126,84 @@ namespace Mist {
} }
} }
} }
}
void OutJSON::onWebsocketFrame(){
if (!isPushing()){
if (!allowPush(pushPass)){
onFinish();
return;
}
}
if (!bootMsOffset){
if (myMeta.bootMsOffset){
bootMsOffset = myMeta.bootMsOffset;
}else{
bootMsOffset = Util::bootMS();
}
}
//We now know we're allowed to push. Read a JSON object.
JSON::Value inJSON = JSON::fromString(webSock->data, webSock->data.size());
if (!inJSON || !inJSON.isObject()){
//Ignore empty and/or non-parsable JSON packets
MEDIUM_MSG("Ignoring non-JSON object: %s", webSock->data);
return;
}
//Let's create a new track for pushing purposes, if needed
if (!pushTrack){
pushTrack = 1;
while (myMeta.tracks.count(pushTrack)){
++pushTrack;
}
}
myMeta.tracks[pushTrack].type = "meta";
myMeta.tracks[pushTrack].codec = "JSON";
//We have a track set correctly. Let's attempt to buffer a frame.
inJSON["trackid"] = (long long)pushTrack;
inJSON["datatype"] = "meta";
lastSendTime = Util::bootMS();
if (!inJSON.isMember("unix")){
//Base timestamp on arrival time
inJSON["time"] = (long long)(lastSendTime - bootMsOffset);
}else{
//Base timestamp on unix time
inJSON["time"] = (long long)((lastSendTime - bootMsOffset) + (Util::epoch() - Util::bootSecs()) * 1000);
}
inJSON["bmo"] = (long long)bootMsOffset;
lastVal = inJSON;
std::string packedJson = inJSON.toNetPacked();
DTSC::Packet newPack(packedJson.data(), packedJson.size(), true);
bufferLivePacket(newPack);
if (!idleInterval){idleInterval = 100;}
if (isBlocking){setBlocking(false);}
}
/// Repeats last JSON packet every 5 seconds to keep stream alive.
void OutJSON::onIdle(){
if (nProxy.trackState[pushTrack] != FILL_ACC){
continueNegotiate(pushTrack);
if (nProxy.trackState[pushTrack] == FILL_ACC){
idleInterval = 5000;
}
return;
}
lastVal["time"] = (long long)(lastVal["time"].asInt() + (Util::bootMS() - lastSendTime));
lastSendTime = Util::bootMS();
lastVal.netPrepare();
std::string packedJson = lastVal.toNetPacked();
DTSC::Packet newPack(packedJson.data(), packedJson.size(), true);
myMeta.tracks[pushTrack].type = "meta";
myMeta.tracks[pushTrack].codec = "JSON";
bufferLivePacket(newPack);
}
void OutJSON::onHTTP(){
std::string method = H.method;
preWebsocketConnect();//Not actually a websocket, but we need to do the same checks
jsonp = "";
if (H.GetVar("callback") != ""){jsonp = H.GetVar("callback");} if (H.GetVar("callback") != ""){jsonp = H.GetVar("callback");}
if (H.GetVar("jsonp") != ""){jsonp = H.GetVar("jsonp");} if (H.GetVar("jsonp") != ""){jsonp = H.GetVar("jsonp");}
if (H.GetHeader("Upgrade") == "websocket"){
ws = new HTTP::Websocket(myConn, H);
if (!(*ws)){
delete ws;
ws = 0;
return;
}
sentHeader = true;
parseData = true;
wantRequest = false;
return;
}
H.Clean(); H.Clean();
H.setCORSHeaders(); H.setCORSHeaders();
if(method == "OPTIONS" || method == "HEAD"){ if(method == "OPTIONS" || method == "HEAD"){

View file

@ -5,21 +5,29 @@ namespace Mist {
class OutJSON : public HTTPOutput { class OutJSON : public HTTPOutput {
public: public:
OutJSON(Socket::Connection & conn); OutJSON(Socket::Connection & conn);
~OutJSON();
static void init(Util::Config * cfg); static void init(Util::Config * cfg);
void onHTTP(); void onHTTP();
void onIdle();
virtual void onWebsocketFrame();
virtual void onWebsocketConnect();
virtual void preWebsocketConnect();
bool onFinish(); bool onFinish();
void onFail(); void onFail();
void sendNext(); void sendNext();
void sendHeader(); void sendHeader();
bool doesWebsockets(){return true;}
protected: protected:
JSON::Value lastVal; JSON::Value lastVal;
uint64_t lastSendTime;
bool keepReselecting; bool keepReselecting;
std::string jsonp; std::string jsonp;
std::string pushPass;
uint64_t pushTrack;
int64_t bootMsOffset;
bool dupcheck; bool dupcheck;
std::set<std::string> nodup; std::set<std::string> nodup;
bool first; bool first;
HTTP::Websocket * ws; bool noReceive;
}; };
} }