DTSC push output support, fixes for DTSC push input and DTSC pull output

This commit is contained in:
Thulinma 2020-08-30 17:01:12 +02:00
parent 77aa90d48c
commit ea49344628
4 changed files with 195 additions and 104 deletions

View file

@ -969,6 +969,7 @@ namespace DTSC{
streamInit(); streamInit();
setVod(src.hasMember("vod") && src.getMember("vod").asInt()); setVod(src.hasMember("vod") && src.getMember("vod").asInt());
setLive(src.hasMember("live") && src.getMember("live").asInt());
version = src.getMember("version").asInt(); version = src.getMember("version").asInt();
@ -978,98 +979,108 @@ namespace DTSC{
size_t tNum = src.getMember("tracks").getSize(); size_t tNum = src.getMember("tracks").getSize();
for (int i = 0; i < tNum; i++){ for (int i = 0; i < tNum; i++){
DTSC::Scan trak = src.getMember("tracks").getIndice(i); addTrackFrom(src.getMember("tracks").getIndice(i));
}
}
char *fragStor; void Meta::addTrackFrom(const DTSC::Scan &trak){
char *keyStor; char *fragStor = 0;
char *partStor; char *keyStor = 0;
char *keySizeStor; char *partStor = 0;
size_t fragLen; char *keySizeStor = 0;
size_t keyLen; size_t fragLen = 0;
size_t partLen; size_t keyLen = 0;
size_t keySizeLen; size_t partLen = 0;
size_t keySizeLen = 0;
uint32_t fragCount = DEFAULT_FRAGMENT_COUNT;
uint32_t keyCount = DEFAULT_KEY_COUNT;
uint32_t partCount = DEFAULT_PART_COUNT;
if (trak.hasMember("fragments") && trak.hasMember("keys") && trak.hasMember("parts") && trak.hasMember("keysizes")){
trak.getMember("fragments").getString(fragStor, fragLen); trak.getMember("fragments").getString(fragStor, fragLen);
trak.getMember("keys").getString(keyStor, keyLen); trak.getMember("keys").getString(keyStor, keyLen);
trak.getMember("parts").getString(partStor, partLen); trak.getMember("parts").getString(partStor, partLen);
trak.getMember("keysizes").getString(keySizeStor, keySizeLen); trak.getMember("keysizes").getString(keySizeStor, keySizeLen);
uint32_t fragCount = fragLen / DTSH_FRAGMENT_SIZE; fragCount = fragLen / DTSH_FRAGMENT_SIZE;
uint32_t keyCount = keyLen / DTSH_KEY_SIZE; keyCount = keyLen / DTSH_KEY_SIZE;
uint32_t partCount = partLen / DTSH_PART_SIZE; partCount = partLen / DTSH_PART_SIZE;
size_t tIdx = addTrack(fragCount ? fragCount : DEFAULT_FRAGMENT_COUNT, keyCount ? keyCount : DEFAULT_KEY_COUNT,
partCount ? partCount : DEFAULT_PART_COUNT);
setType(tIdx, trak.getMember("type").asString());
setCodec(tIdx, trak.getMember("codec").asString());
setInit(tIdx, trak.getMember("init").asString());
setID(tIdx, trak.getMember("trackid").asInt());
setFirstms(tIdx, trak.getMember("firstms").asInt());
setLastms(tIdx, trak.getMember("lastms").asInt());
setBps(tIdx, trak.getMember("bps").asInt());
setMaxBps(tIdx, trak.getMember("maxbps").asInt());
setSourceTrack(tIdx, INVALID_TRACK_ID);
if (trak.getMember("type").asString() == "video"){
setWidth(tIdx, trak.getMember("width").asInt());
setHeight(tIdx, trak.getMember("height").asInt());
setFpks(tIdx, trak.getMember("fpks").asInt());
}else if (trak.getMember("type").asString() == "audio"){
// rate channels size
setRate(tIdx, trak.getMember("rate").asInt());
setChannels(tIdx, trak.getMember("channels").asInt());
setSize(tIdx, trak.getMember("size").asInt());
}
Track &s = tracks[tIdx];
s.fragments.addRecords(fragCount);
uint64_t *vals = (uint64_t *)malloc(4 * fragCount * sizeof(uint64_t));
for (int i = 0; i < fragCount; i++){
char *ptr = fragStor + (i * DTSH_FRAGMENT_SIZE);
vals[i] = Bit::btohl(ptr);
vals[fragCount + i] = ptr[4];
vals[(2 * fragCount) + i] = Bit::btohl(ptr + 5) - 1;
vals[(3 * fragCount) + i] = Bit::btohl(ptr + 9);
}
s.fragments.setInts("duration", vals, fragCount);
s.fragments.setInts("keys", vals + fragCount, fragCount);
s.fragments.setInts("firstkey", vals + (2 * fragCount), fragCount);
s.fragments.setInts("size", vals + (3 * fragCount), fragCount);
vals = (uint64_t *)realloc(vals, 7 * keyCount * sizeof(uint64_t));
s.keys.addRecords(keyCount);
uint64_t totalPartCount = 0;
for (int i = 0; i < keyCount; i++){
char *ptr = keyStor + (i * DTSH_KEY_SIZE);
vals[i] = Bit::btohll(ptr);
vals[keyCount + i] = Bit::btoh24(ptr + 8);
vals[(2 * keyCount) + i] = Bit::btohl(ptr + 11);
vals[(3 * keyCount) + i] = Bit::btohs(ptr + 15);
vals[(4 * keyCount) + i] = Bit::btohll(ptr + 17);
vals[(5 * keyCount) + i] = Bit::btohl(keySizeStor + (i * 4)); // NOT WITH ptr!!
vals[(6 * keyCount) + i] = totalPartCount;
totalPartCount += vals[(3 * keyCount) + i];
}
s.keys.setInts("bpos", vals, keyCount);
s.keys.setInts("duration", vals + keyCount, keyCount);
s.keys.setInts("number", vals + (2 * keyCount), keyCount);
s.keys.setInts("parts", vals + (3 * keyCount), keyCount);
s.keys.setInts("time", vals + (4 * keyCount), keyCount);
s.keys.setInts("size", vals + (5 * keyCount), keyCount);
s.keys.setInts("firstpart", vals + (6 * keyCount), keyCount);
vals = (uint64_t *)realloc(vals, 3 * partCount * sizeof(uint64_t));
s.parts.addRecords(partCount);
for (int i = 0; i < partCount; i++){
char *ptr = partStor + (i * DTSH_PART_SIZE);
vals[i] = Bit::btoh24(ptr);
vals[partCount + i] = Bit::btoh24(ptr + 3);
vals[(2 * partCount) + i] = Bit::btoh24(ptr + 6);
}
s.parts.setInts("size", vals, partCount);
s.parts.setInts("duration", vals + partCount, partCount);
s.parts.setInts("offset", vals + (2 * partCount), partCount);
free(vals);
} }
size_t tIdx = addTrack(fragCount, keyCount, partCount);
setType(tIdx, trak.getMember("type").asString());
setCodec(tIdx, trak.getMember("codec").asString());
setInit(tIdx, trak.getMember("init").asString());
setID(tIdx, trak.getMember("trackid").asInt());
setFirstms(tIdx, trak.getMember("firstms").asInt());
setLastms(tIdx, trak.getMember("lastms").asInt());
setBps(tIdx, trak.getMember("bps").asInt());
setMaxBps(tIdx, trak.getMember("maxbps").asInt());
setSourceTrack(tIdx, INVALID_TRACK_ID);
if (trak.getMember("type").asString() == "video"){
setWidth(tIdx, trak.getMember("width").asInt());
setHeight(tIdx, trak.getMember("height").asInt());
setFpks(tIdx, trak.getMember("fpks").asInt());
}else if (trak.getMember("type").asString() == "audio"){
// rate channels size
setRate(tIdx, trak.getMember("rate").asInt());
setChannels(tIdx, trak.getMember("channels").asInt());
setSize(tIdx, trak.getMember("size").asInt());
}
//Do not parse any of the more complex data, if any of it is missing.
if (!fragLen || !keyLen || !partLen || !keySizeLen){return;}
//Ok, we have data, let's parse it, too.
Track &s = tracks[tIdx];
s.fragments.addRecords(fragCount);
uint64_t *vals = (uint64_t *)malloc(4 * fragCount * sizeof(uint64_t));
for (int i = 0; i < fragCount; i++){
char *ptr = fragStor + (i * DTSH_FRAGMENT_SIZE);
vals[i] = Bit::btohl(ptr);
vals[fragCount + i] = ptr[4];
vals[(2 * fragCount) + i] = Bit::btohl(ptr + 5) - 1;
vals[(3 * fragCount) + i] = Bit::btohl(ptr + 9);
}
s.fragments.setInts("duration", vals, fragCount);
s.fragments.setInts("keys", vals + fragCount, fragCount);
s.fragments.setInts("firstkey", vals + (2 * fragCount), fragCount);
s.fragments.setInts("size", vals + (3 * fragCount), fragCount);
vals = (uint64_t *)realloc(vals, 7 * keyCount * sizeof(uint64_t));
s.keys.addRecords(keyCount);
uint64_t totalPartCount = 0;
for (int i = 0; i < keyCount; i++){
char *ptr = keyStor + (i * DTSH_KEY_SIZE);
vals[i] = Bit::btohll(ptr);
vals[keyCount + i] = Bit::btoh24(ptr + 8);
vals[(2 * keyCount) + i] = Bit::btohl(ptr + 11);
vals[(3 * keyCount) + i] = Bit::btohs(ptr + 15);
vals[(4 * keyCount) + i] = Bit::btohll(ptr + 17);
vals[(5 * keyCount) + i] = Bit::btohl(keySizeStor + (i * 4)); // NOT WITH ptr!!
vals[(6 * keyCount) + i] = totalPartCount;
totalPartCount += vals[(3 * keyCount) + i];
}
s.keys.setInts("bpos", vals, keyCount);
s.keys.setInts("duration", vals + keyCount, keyCount);
s.keys.setInts("number", vals + (2 * keyCount), keyCount);
s.keys.setInts("parts", vals + (3 * keyCount), keyCount);
s.keys.setInts("time", vals + (4 * keyCount), keyCount);
s.keys.setInts("size", vals + (5 * keyCount), keyCount);
s.keys.setInts("firstpart", vals + (6 * keyCount), keyCount);
vals = (uint64_t *)realloc(vals, 3 * partCount * sizeof(uint64_t));
s.parts.addRecords(partCount);
for (int i = 0; i < partCount; i++){
char *ptr = partStor + (i * DTSH_PART_SIZE);
vals[i] = Bit::btoh24(ptr);
vals[partCount + i] = Bit::btoh24(ptr + 3);
vals[(2 * partCount) + i] = Bit::btoh24(ptr + 6);
}
s.parts.setInts("size", vals, partCount);
s.parts.setInts("duration", vals + partCount, partCount);
s.parts.setInts("offset", vals + (2 * partCount), partCount);
free(vals);
} }
/// Simply calls clear() /// Simply calls clear()
@ -2428,7 +2439,9 @@ namespace DTSC{
///\brief Determines the "packed" size of a Meta object ///\brief Determines the "packed" size of a Meta object
uint64_t Meta::getSendLen(bool skipDynamic, std::set<size_t> selectedTracks) const{ uint64_t Meta::getSendLen(bool skipDynamic, std::set<size_t> selectedTracks) const{
uint64_t dataLen = 48; // + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; uint64_t dataLen = 34; // + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21;
if (getVod()){dataLen += 14;}
if (getLive()){dataLen += 15;}
for (std::map<size_t, Track>::const_iterator it = tracks.begin(); it != tracks.end(); it++){ for (std::map<size_t, Track>::const_iterator it = tracks.begin(); it != tracks.end(); it++){
if (!it->second.parts.getPresent()){continue;} if (!it->second.parts.getPresent()){continue;}
if (!selectedTracks.size() || selectedTracks.count(it->first)){ if (!selectedTracks.size() || selectedTracks.count(it->first)){
@ -2500,7 +2513,8 @@ namespace DTSC{
oFile.write(DTSC::Magic_Header, 4); oFile.write(DTSC::Magic_Header, 4);
oFile.write(c32(lVarSize + getSendLen() - 8), 4); oFile.write(c32(lVarSize + getSendLen() - 8), 4);
oFile.write("\340", 1); oFile.write("\340", 1);
oFile.write("\000\003vod\001\000\000\000\000\000\000\000\001", 14); if (getVod()){oFile.write("\000\003vod\001\000\000\000\000\000\000\000\001", 14);}
if (getLive()){oFile.write("\000\004live\001\000\000\000\000\000\000\000\001", 15);}
oFile.write("\000\007version\001", 10); oFile.write("\000\007version\001", 10);
oFile.write(c64(DTSH_VERSION), 8); oFile.write(c64(DTSH_VERSION), 8);
if (lVarSize){ if (lVarSize){
@ -2683,7 +2697,8 @@ namespace DTSC{
conn.SendNow(DTSC::Magic_Header, 4); conn.SendNow(DTSC::Magic_Header, 4);
conn.SendNow(c32(getSendLen(skipDynamic, selectedTracks) - 8), 4); conn.SendNow(c32(getSendLen(skipDynamic, selectedTracks) - 8), 4);
conn.SendNow("\340", 1); conn.SendNow("\340", 1);
conn.SendNow("\000\003vod\001\000\000\000\000\000\000\000\001", 14); if (getVod()){conn.SendNow("\000\003vod\001\000\000\000\000\000\000\000\001", 14);}
if (getLive()){conn.SendNow("\000\004live\001\000\000\000\000\000\000\000\001", 15);}
conn.SendNow("\000\007version\001", 10); conn.SendNow("\000\007version\001", 10);
conn.SendNow(c64(DTSH_VERSION), 8); conn.SendNow(c64(DTSH_VERSION), 8);
conn.SendNow("\000\006tracks\340", 9); conn.SendNow("\000\006tracks\340", 9);

View file

@ -278,6 +278,7 @@ namespace DTSC{
void reInit(const std::string &_streamName, bool master = true); void reInit(const std::string &_streamName, bool master = true);
void reInit(const std::string &_streamName, const std::string &fileName); void reInit(const std::string &_streamName, const std::string &fileName);
void reInit(const std::string &_streamName, const DTSC::Scan &src); void reInit(const std::string &_streamName, const DTSC::Scan &src);
void addTrackFrom(const DTSC::Scan &src);
void refresh(); void refresh();

View file

@ -6,12 +6,49 @@
#include <mist/defines.h> #include <mist/defines.h>
#include <mist/stream.h> #include <mist/stream.h>
#include <mist/triggers.h> #include <mist/triggers.h>
#include <mist/http_parser.h>
#include <sys/stat.h> #include <sys/stat.h>
namespace Mist{ namespace Mist{
OutDTSC::OutDTSC(Socket::Connection &conn) : Output(conn){ OutDTSC::OutDTSC(Socket::Connection &conn) : Output(conn){
setBlocking(true);
JSON::Value prep; JSON::Value prep;
if (config->getString("target").size()){
streamName = config->getString("streamname");
pushUrl = HTTP::URL(config->getString("target"));
if (pushUrl.protocol != "dtsc"){
onFail("Target must start with dtsc://", true);
return;
}
if (!pushUrl.path.size()){pushUrl.path = streamName;}
INFO_MSG("About to push stream %s out. Host: %s, port: %d, target stream: %s", streamName.c_str(),
pushUrl.host.c_str(), pushUrl.getPort(), pushUrl.path.c_str());
myConn.close();
myConn.Received().clear();
myConn.open(pushUrl.host, pushUrl.getPort(), true);
initialize();
initialSeek();
if (!myConn){
onFail("Could not start push, aborting", true);
return;
}
prep["cmd"] = "push";
prep["version"] = APPIDENT;
prep["stream"] = pushUrl.path;
std::map<std::string, std::string> args;
HTTP::parseVars(pushUrl.args, args);
if (args.count("pass")){prep["password"] = args["pass"];}
if (args.count("pw")){prep["password"] = args["pw"];}
if (args.count("password")){prep["password"] = args["password"];}
if (pushUrl.pass.size()){prep["password"] = pushUrl.pass;}
sendCmd(prep);
wantRequest = true;
parseData = true;
return;
}
setBlocking(true);
prep["cmd"] = "hi"; prep["cmd"] = "hi";
prep["version"] = APPIDENT; prep["version"] = APPIDENT;
prep["pack_method"] = 2; prep["pack_method"] = 2;
@ -58,6 +95,19 @@ namespace Mist{
capa["desc"] = "Real time streaming over DTSC (proprietary protocol for efficient inter-server streaming)"; capa["desc"] = "Real time streaming over DTSC (proprietary protocol for efficient inter-server streaming)";
capa["deps"] = ""; capa["deps"] = "";
capa["codecs"][0u][0u].append("+*"); capa["codecs"][0u][0u].append("+*");
capa["push_urls"].append("dtsc://*");
capa["incoming_push_url"] = "dtsc://$host:$port/$stream?pass=$password";
JSON::Value opt;
opt["arg"] = "string";
opt["default"] = "";
opt["arg_num"] = 1;
opt["help"] = "Target DTSC URL to push out towards.";
cfg->addOption("target", opt);
cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":"
"\"stream\",\"help\":\"The name of the stream to "
"push out, when pushing out.\"}"));
cfg->addConnectorOptions(4200, capa); cfg->addConnectorOptions(4200, capa);
config = cfg; config = cfg;
} }
@ -123,14 +173,9 @@ namespace Mist{
void OutDTSC::sendHeader(){ void OutDTSC::sendHeader(){
sentHeader = true; sentHeader = true;
userSelect.clear();
std::set<size_t> validTracks = M.getValidTracks();
std::set<size_t> selectedTracks; std::set<size_t> selectedTracks;
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){ for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (M.getType(*it) == "video" || M.getType(*it) == "audio"){ selectedTracks.insert(it->first);
userSelect[*it].reload(streamName, *it);
selectedTracks.insert(*it);
}
} }
M.send(myConn, true, selectedTracks, true); M.send(myConn, true, selectedTracks, true);
if (M.getLive()){realTime = 0;} if (M.getLive()){realTime = 0;}
@ -154,7 +199,7 @@ namespace Mist{
myConn.Received().remove(8); myConn.Received().remove(8);
std::string dataPacket = myConn.Received().remove(rSize); std::string dataPacket = myConn.Received().remove(rSize);
DTSC::Scan dScan((char *)dataPacket.data(), rSize); DTSC::Scan dScan((char *)dataPacket.data(), rSize);
INFO_MSG("Received DTCM: %s", dScan.asJSON().toString().c_str()); HIGH_MSG("Received DTCM: %s", dScan.asJSON().toString().c_str());
if (dScan.getMember("cmd").asString() == "push"){ if (dScan.getMember("cmd").asString() == "push"){
handlePush(dScan); handlePush(dScan);
continue; continue;
@ -171,12 +216,16 @@ namespace Mist{
INFO_MSG("Ok: %s", dScan.getMember("msg").asString().c_str()); INFO_MSG("Ok: %s", dScan.getMember("msg").asString().c_str());
continue; continue;
} }
if (dScan.getMember("cmd").asString() == "hi"){
INFO_MSG("Connected to server running version %s", dScan.getMember("version").asString().c_str());
continue;
}
if (dScan.getMember("cmd").asString() == "error"){ if (dScan.getMember("cmd").asString() == "error"){
ERROR_MSG("%s", dScan.getMember("msg").asString().c_str()); ERROR_MSG("%s", dScan.getMember("msg").asString().c_str());
continue; continue;
} }
if (dScan.getMember("cmd").asString() == "reset"){ if (dScan.getMember("cmd").asString() == "reset"){
meta.reInit(streamName); userSelect.clear();
sendOk("Internal state reset"); sendOk("Internal state reset");
continue; continue;
} }
@ -192,9 +241,24 @@ namespace Mist{
if (!myConn.Received().available(8 + rSize)){return;}// abort - not enough data yet if (!myConn.Received().available(8 + rSize)){return;}// abort - not enough data yet
std::string dataPacket = myConn.Received().remove(8 + rSize); std::string dataPacket = myConn.Received().remove(8 + rSize);
DTSC::Packet metaPack(dataPacket.data(), dataPacket.size()); DTSC::Packet metaPack(dataPacket.data(), dataPacket.size());
meta.reInit(streamName, metaPack.getScan()); DTSC::Scan metaScan = metaPack.getScan();
meta.refresh();
size_t prevTracks = meta.getValidTracks().size();
size_t tNum = metaScan.getMember("tracks").getSize();
for (int i = 0; i < tNum; i++){
DTSC::Scan trk = metaScan.getMember("tracks").getIndice(i);
size_t trackID = trk.getMember("trackid").asInt();
if (meta.trackIDToIndex(trackID, getpid()) == INVALID_TRACK_ID){
MEDIUM_MSG("Adding track: %s", trk.asJSON().toString().c_str());
meta.addTrackFrom(trk);
}else{
HIGH_MSG("Already had track: %s", trk.asJSON().toString().c_str());
}
}
meta.refresh();
std::stringstream rep; std::stringstream rep;
rep << "DTSC_HEAD received with " << M.getValidTracks().size() << " tracks. Bring on those data packets!"; rep << "DTSC_HEAD parsed, we went from " << prevTracks << " to " << meta.getValidTracks().size() << " tracks. Bring on those data packets!";
sendOk(rep.str()); sendOk(rep.str());
}else if (myConn.Received().copy(4) == "DTP2"){ }else if (myConn.Received().copy(4) == "DTP2"){
if (!isPushing()){ if (!isPushing()){
@ -207,11 +271,19 @@ namespace Mist{
if (!myConn.Received().available(8 + rSize)){return;}// abort - not enough data yet if (!myConn.Received().available(8 + rSize)){return;}// abort - not enough data yet
std::string dataPacket = myConn.Received().remove(8 + rSize); std::string dataPacket = myConn.Received().remove(8 + rSize);
DTSC::Packet inPack(dataPacket.data(), dataPacket.size(), true); DTSC::Packet inPack(dataPacket.data(), dataPacket.size(), true);
if (M.trackIDToIndex(inPack.getTrackId(), getpid()) == INVALID_TRACK_ID){ size_t tid = M.trackIDToIndex(inPack.getTrackId(), getpid());
onFail("DTSC_V2 received for a track that was not announced in the DTSC_HEAD!", true); if (tid == INVALID_TRACK_ID){
//WARN_MSG("Received data for unknown track: %zu", inPack.getTrackId());
onFail("DTSC_V2 received for a track that was not announced in a header!", true);
return; return;
} }
bufferLivePacket(inPack); if (!userSelect.count(tid)){
userSelect[tid].reload(streamName, tid, COMM_STATUS_SOURCE);
}
char *data;
size_t dataLen;
inPack.getString("data", data, dataLen);
bufferLivePacket(inPack.getTime(), inPack.getInt("offset"), tid, data, dataLen, inPack.getInt("bpos"), inPack.getFlag("keyframe"));
}else{ }else{
// Invalid // Invalid
onFail("Invalid packet header received. Aborting.", true); onFail("Invalid packet header received. Aborting.", true);

View file

@ -1,4 +1,5 @@
#include "output.h" #include "output.h"
#include <mist/url.h>
namespace Mist{ namespace Mist{
@ -11,6 +12,7 @@ namespace Mist{
void sendNext(); void sendNext();
void sendHeader(); void sendHeader();
void initialSeek(); void initialSeek();
static bool listenMode(){return !(config->getString("target").size());}
void onFail(const std::string &msg, bool critical = false); void onFail(const std::string &msg, bool critical = false);
void stats(bool force = false); void stats(bool force = false);
void sendCmd(const JSON::Value &data); void sendCmd(const JSON::Value &data);
@ -20,6 +22,7 @@ namespace Mist{
unsigned int lastActive; ///< Time of last sending of data. unsigned int lastActive; ///< Time of last sending of data.
std::string getStatsName(); std::string getStatsName();
std::string salt; std::string salt;
HTTP::URL pushUrl;
void handlePush(DTSC::Scan &dScan); void handlePush(DTSC::Scan &dScan);
void handlePlay(DTSC::Scan &dScan); void handlePlay(DTSC::Scan &dScan);
}; };