diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index ffa935fc..c7de596e 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -969,6 +969,7 @@ namespace DTSC{ streamInit(); setVod(src.hasMember("vod") && src.getMember("vod").asInt()); + setLive(src.hasMember("live") && src.getMember("live").asInt()); version = src.getMember("version").asInt(); @@ -978,98 +979,108 @@ namespace DTSC{ size_t tNum = src.getMember("tracks").getSize(); for (int i = 0; i < tNum; i++){ - DTSC::Scan trak = src.getMember("tracks").getIndice(i); + addTrackFrom(src.getMember("tracks").getIndice(i)); + } + } - char *fragStor; - char *keyStor; - char *partStor; - char *keySizeStor; - size_t fragLen; - size_t keyLen; - size_t partLen; - size_t keySizeLen; + void Meta::addTrackFrom(const DTSC::Scan &trak){ + char *fragStor = 0; + char *keyStor = 0; + char *partStor = 0; + char *keySizeStor = 0; + size_t fragLen = 0; + size_t keyLen = 0; + size_t partLen = 0; + size_t keySizeLen = 0; + uint32_t fragCount = DEFAULT_FRAGMENT_COUNT; + uint32_t keyCount = DEFAULT_KEY_COUNT; + uint32_t partCount = DEFAULT_PART_COUNT; + + if (trak.hasMember("fragments") && trak.hasMember("keys") && trak.hasMember("parts") && trak.hasMember("keysizes")){ trak.getMember("fragments").getString(fragStor, fragLen); trak.getMember("keys").getString(keyStor, keyLen); trak.getMember("parts").getString(partStor, partLen); trak.getMember("keysizes").getString(keySizeStor, keySizeLen); - uint32_t fragCount = fragLen / DTSH_FRAGMENT_SIZE; - uint32_t keyCount = keyLen / DTSH_KEY_SIZE; - uint32_t partCount = partLen / DTSH_PART_SIZE; - size_t tIdx = addTrack(fragCount ? fragCount : DEFAULT_FRAGMENT_COUNT, keyCount ? keyCount : DEFAULT_KEY_COUNT, - partCount ? partCount : DEFAULT_PART_COUNT); - - setType(tIdx, trak.getMember("type").asString()); - setCodec(tIdx, trak.getMember("codec").asString()); - setInit(tIdx, trak.getMember("init").asString()); - setID(tIdx, trak.getMember("trackid").asInt()); - setFirstms(tIdx, trak.getMember("firstms").asInt()); - setLastms(tIdx, trak.getMember("lastms").asInt()); - setBps(tIdx, trak.getMember("bps").asInt()); - setMaxBps(tIdx, trak.getMember("maxbps").asInt()); - setSourceTrack(tIdx, INVALID_TRACK_ID); - if (trak.getMember("type").asString() == "video"){ - setWidth(tIdx, trak.getMember("width").asInt()); - setHeight(tIdx, trak.getMember("height").asInt()); - setFpks(tIdx, trak.getMember("fpks").asInt()); - - }else if (trak.getMember("type").asString() == "audio"){ - // rate channels size - setRate(tIdx, trak.getMember("rate").asInt()); - setChannels(tIdx, trak.getMember("channels").asInt()); - setSize(tIdx, trak.getMember("size").asInt()); - } - - Track &s = tracks[tIdx]; - - s.fragments.addRecords(fragCount); - uint64_t *vals = (uint64_t *)malloc(4 * fragCount * sizeof(uint64_t)); - for (int i = 0; i < fragCount; i++){ - char *ptr = fragStor + (i * DTSH_FRAGMENT_SIZE); - vals[i] = Bit::btohl(ptr); - vals[fragCount + i] = ptr[4]; - vals[(2 * fragCount) + i] = Bit::btohl(ptr + 5) - 1; - vals[(3 * fragCount) + i] = Bit::btohl(ptr + 9); - } - s.fragments.setInts("duration", vals, fragCount); - s.fragments.setInts("keys", vals + fragCount, fragCount); - s.fragments.setInts("firstkey", vals + (2 * fragCount), fragCount); - s.fragments.setInts("size", vals + (3 * fragCount), fragCount); - - vals = (uint64_t *)realloc(vals, 7 * keyCount * sizeof(uint64_t)); - s.keys.addRecords(keyCount); - uint64_t totalPartCount = 0; - for (int i = 0; i < keyCount; i++){ - char *ptr = keyStor + (i * DTSH_KEY_SIZE); - vals[i] = Bit::btohll(ptr); - vals[keyCount + i] = Bit::btoh24(ptr + 8); - vals[(2 * keyCount) + i] = Bit::btohl(ptr + 11); - vals[(3 * keyCount) + i] = Bit::btohs(ptr + 15); - vals[(4 * keyCount) + i] = Bit::btohll(ptr + 17); - vals[(5 * keyCount) + i] = Bit::btohl(keySizeStor + (i * 4)); // NOT WITH ptr!! - vals[(6 * keyCount) + i] = totalPartCount; - totalPartCount += vals[(3 * keyCount) + i]; - } - s.keys.setInts("bpos", vals, keyCount); - s.keys.setInts("duration", vals + keyCount, keyCount); - s.keys.setInts("number", vals + (2 * keyCount), keyCount); - s.keys.setInts("parts", vals + (3 * keyCount), keyCount); - s.keys.setInts("time", vals + (4 * keyCount), keyCount); - s.keys.setInts("size", vals + (5 * keyCount), keyCount); - s.keys.setInts("firstpart", vals + (6 * keyCount), keyCount); - - vals = (uint64_t *)realloc(vals, 3 * partCount * sizeof(uint64_t)); - s.parts.addRecords(partCount); - for (int i = 0; i < partCount; i++){ - char *ptr = partStor + (i * DTSH_PART_SIZE); - vals[i] = Bit::btoh24(ptr); - vals[partCount + i] = Bit::btoh24(ptr + 3); - vals[(2 * partCount) + i] = Bit::btoh24(ptr + 6); - } - s.parts.setInts("size", vals, partCount); - s.parts.setInts("duration", vals + partCount, partCount); - s.parts.setInts("offset", vals + (2 * partCount), partCount); - free(vals); + fragCount = fragLen / DTSH_FRAGMENT_SIZE; + keyCount = keyLen / DTSH_KEY_SIZE; + partCount = partLen / DTSH_PART_SIZE; } + size_t tIdx = addTrack(fragCount, keyCount, partCount); + + setType(tIdx, trak.getMember("type").asString()); + setCodec(tIdx, trak.getMember("codec").asString()); + setInit(tIdx, trak.getMember("init").asString()); + setID(tIdx, trak.getMember("trackid").asInt()); + setFirstms(tIdx, trak.getMember("firstms").asInt()); + setLastms(tIdx, trak.getMember("lastms").asInt()); + setBps(tIdx, trak.getMember("bps").asInt()); + setMaxBps(tIdx, trak.getMember("maxbps").asInt()); + setSourceTrack(tIdx, INVALID_TRACK_ID); + if (trak.getMember("type").asString() == "video"){ + setWidth(tIdx, trak.getMember("width").asInt()); + setHeight(tIdx, trak.getMember("height").asInt()); + setFpks(tIdx, trak.getMember("fpks").asInt()); + + }else if (trak.getMember("type").asString() == "audio"){ + // rate channels size + setRate(tIdx, trak.getMember("rate").asInt()); + setChannels(tIdx, trak.getMember("channels").asInt()); + setSize(tIdx, trak.getMember("size").asInt()); + } + + //Do not parse any of the more complex data, if any of it is missing. + if (!fragLen || !keyLen || !partLen || !keySizeLen){return;} + + //Ok, we have data, let's parse it, too. + Track &s = tracks[tIdx]; + s.fragments.addRecords(fragCount); + uint64_t *vals = (uint64_t *)malloc(4 * fragCount * sizeof(uint64_t)); + for (int i = 0; i < fragCount; i++){ + char *ptr = fragStor + (i * DTSH_FRAGMENT_SIZE); + vals[i] = Bit::btohl(ptr); + vals[fragCount + i] = ptr[4]; + vals[(2 * fragCount) + i] = Bit::btohl(ptr + 5) - 1; + vals[(3 * fragCount) + i] = Bit::btohl(ptr + 9); + } + s.fragments.setInts("duration", vals, fragCount); + s.fragments.setInts("keys", vals + fragCount, fragCount); + s.fragments.setInts("firstkey", vals + (2 * fragCount), fragCount); + s.fragments.setInts("size", vals + (3 * fragCount), fragCount); + + vals = (uint64_t *)realloc(vals, 7 * keyCount * sizeof(uint64_t)); + s.keys.addRecords(keyCount); + uint64_t totalPartCount = 0; + for (int i = 0; i < keyCount; i++){ + char *ptr = keyStor + (i * DTSH_KEY_SIZE); + vals[i] = Bit::btohll(ptr); + vals[keyCount + i] = Bit::btoh24(ptr + 8); + vals[(2 * keyCount) + i] = Bit::btohl(ptr + 11); + vals[(3 * keyCount) + i] = Bit::btohs(ptr + 15); + vals[(4 * keyCount) + i] = Bit::btohll(ptr + 17); + vals[(5 * keyCount) + i] = Bit::btohl(keySizeStor + (i * 4)); // NOT WITH ptr!! + vals[(6 * keyCount) + i] = totalPartCount; + totalPartCount += vals[(3 * keyCount) + i]; + } + s.keys.setInts("bpos", vals, keyCount); + s.keys.setInts("duration", vals + keyCount, keyCount); + s.keys.setInts("number", vals + (2 * keyCount), keyCount); + s.keys.setInts("parts", vals + (3 * keyCount), keyCount); + s.keys.setInts("time", vals + (4 * keyCount), keyCount); + s.keys.setInts("size", vals + (5 * keyCount), keyCount); + s.keys.setInts("firstpart", vals + (6 * keyCount), keyCount); + + vals = (uint64_t *)realloc(vals, 3 * partCount * sizeof(uint64_t)); + s.parts.addRecords(partCount); + for (int i = 0; i < partCount; i++){ + char *ptr = partStor + (i * DTSH_PART_SIZE); + vals[i] = Bit::btoh24(ptr); + vals[partCount + i] = Bit::btoh24(ptr + 3); + vals[(2 * partCount) + i] = Bit::btoh24(ptr + 6); + } + s.parts.setInts("size", vals, partCount); + s.parts.setInts("duration", vals + partCount, partCount); + s.parts.setInts("offset", vals + (2 * partCount), partCount); + free(vals); } /// Simply calls clear() @@ -2428,7 +2439,9 @@ namespace DTSC{ ///\brief Determines the "packed" size of a Meta object uint64_t Meta::getSendLen(bool skipDynamic, std::set selectedTracks) const{ - uint64_t dataLen = 48; // + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; + uint64_t dataLen = 34; // + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; + if (getVod()){dataLen += 14;} + if (getLive()){dataLen += 15;} for (std::map::const_iterator it = tracks.begin(); it != tracks.end(); it++){ if (!it->second.parts.getPresent()){continue;} if (!selectedTracks.size() || selectedTracks.count(it->first)){ @@ -2500,7 +2513,8 @@ namespace DTSC{ oFile.write(DTSC::Magic_Header, 4); oFile.write(c32(lVarSize + getSendLen() - 8), 4); oFile.write("\340", 1); - oFile.write("\000\003vod\001\000\000\000\000\000\000\000\001", 14); + if (getVod()){oFile.write("\000\003vod\001\000\000\000\000\000\000\000\001", 14);} + if (getLive()){oFile.write("\000\004live\001\000\000\000\000\000\000\000\001", 15);} oFile.write("\000\007version\001", 10); oFile.write(c64(DTSH_VERSION), 8); if (lVarSize){ @@ -2683,7 +2697,8 @@ namespace DTSC{ conn.SendNow(DTSC::Magic_Header, 4); conn.SendNow(c32(getSendLen(skipDynamic, selectedTracks) - 8), 4); conn.SendNow("\340", 1); - conn.SendNow("\000\003vod\001\000\000\000\000\000\000\000\001", 14); + if (getVod()){conn.SendNow("\000\003vod\001\000\000\000\000\000\000\000\001", 14);} + if (getLive()){conn.SendNow("\000\004live\001\000\000\000\000\000\000\000\001", 15);} conn.SendNow("\000\007version\001", 10); conn.SendNow(c64(DTSH_VERSION), 8); conn.SendNow("\000\006tracks\340", 9); diff --git a/lib/dtsc.h b/lib/dtsc.h index f04bbf68..d8b2b544 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -278,6 +278,7 @@ namespace DTSC{ void reInit(const std::string &_streamName, bool master = true); void reInit(const std::string &_streamName, const std::string &fileName); void reInit(const std::string &_streamName, const DTSC::Scan &src); + void addTrackFrom(const DTSC::Scan &src); void refresh(); diff --git a/src/output/output_dtsc.cpp b/src/output/output_dtsc.cpp index a4235bdb..d8abd71f 100644 --- a/src/output/output_dtsc.cpp +++ b/src/output/output_dtsc.cpp @@ -6,12 +6,49 @@ #include #include #include +#include #include namespace Mist{ OutDTSC::OutDTSC(Socket::Connection &conn) : Output(conn){ - setBlocking(true); JSON::Value prep; + if (config->getString("target").size()){ + streamName = config->getString("streamname"); + pushUrl = HTTP::URL(config->getString("target")); + if (pushUrl.protocol != "dtsc"){ + onFail("Target must start with dtsc://", true); + return; + } + + if (!pushUrl.path.size()){pushUrl.path = streamName;} + INFO_MSG("About to push stream %s out. Host: %s, port: %d, target stream: %s", streamName.c_str(), + pushUrl.host.c_str(), pushUrl.getPort(), pushUrl.path.c_str()); + myConn.close(); + myConn.Received().clear(); + myConn.open(pushUrl.host, pushUrl.getPort(), true); + initialize(); + initialSeek(); + if (!myConn){ + onFail("Could not start push, aborting", true); + return; + } + prep["cmd"] = "push"; + prep["version"] = APPIDENT; + prep["stream"] = pushUrl.path; + std::map args; + HTTP::parseVars(pushUrl.args, args); + if (args.count("pass")){prep["password"] = args["pass"];} + if (args.count("pw")){prep["password"] = args["pw"];} + if (args.count("password")){prep["password"] = args["password"];} + if (pushUrl.pass.size()){prep["password"] = pushUrl.pass;} + sendCmd(prep); + wantRequest = true; + parseData = true; + return; + } + + + setBlocking(true); prep["cmd"] = "hi"; prep["version"] = APPIDENT; prep["pack_method"] = 2; @@ -58,6 +95,19 @@ namespace Mist{ capa["desc"] = "Real time streaming over DTSC (proprietary protocol for efficient inter-server streaming)"; capa["deps"] = ""; capa["codecs"][0u][0u].append("+*"); + capa["push_urls"].append("dtsc://*"); + capa["incoming_push_url"] = "dtsc://$host:$port/$stream?pass=$password"; + + JSON::Value opt; + opt["arg"] = "string"; + opt["default"] = ""; + opt["arg_num"] = 1; + opt["help"] = "Target DTSC URL to push out towards."; + cfg->addOption("target", opt); + cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":" + "\"stream\",\"help\":\"The name of the stream to " + "push out, when pushing out.\"}")); + cfg->addConnectorOptions(4200, capa); config = cfg; } @@ -123,14 +173,9 @@ namespace Mist{ void OutDTSC::sendHeader(){ sentHeader = true; - userSelect.clear(); - std::set validTracks = M.getValidTracks(); std::set selectedTracks; - for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ - if (M.getType(*it) == "video" || M.getType(*it) == "audio"){ - userSelect[*it].reload(streamName, *it); - selectedTracks.insert(*it); - } + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + selectedTracks.insert(it->first); } M.send(myConn, true, selectedTracks, true); if (M.getLive()){realTime = 0;} @@ -154,7 +199,7 @@ namespace Mist{ myConn.Received().remove(8); std::string dataPacket = myConn.Received().remove(rSize); DTSC::Scan dScan((char *)dataPacket.data(), rSize); - INFO_MSG("Received DTCM: %s", dScan.asJSON().toString().c_str()); + HIGH_MSG("Received DTCM: %s", dScan.asJSON().toString().c_str()); if (dScan.getMember("cmd").asString() == "push"){ handlePush(dScan); continue; @@ -171,12 +216,16 @@ namespace Mist{ INFO_MSG("Ok: %s", dScan.getMember("msg").asString().c_str()); continue; } + if (dScan.getMember("cmd").asString() == "hi"){ + INFO_MSG("Connected to server running version %s", dScan.getMember("version").asString().c_str()); + continue; + } if (dScan.getMember("cmd").asString() == "error"){ ERROR_MSG("%s", dScan.getMember("msg").asString().c_str()); continue; } if (dScan.getMember("cmd").asString() == "reset"){ - meta.reInit(streamName); + userSelect.clear(); sendOk("Internal state reset"); continue; } @@ -192,9 +241,24 @@ namespace Mist{ if (!myConn.Received().available(8 + rSize)){return;}// abort - not enough data yet std::string dataPacket = myConn.Received().remove(8 + rSize); DTSC::Packet metaPack(dataPacket.data(), dataPacket.size()); - meta.reInit(streamName, metaPack.getScan()); + DTSC::Scan metaScan = metaPack.getScan(); + meta.refresh(); + size_t prevTracks = meta.getValidTracks().size(); + + size_t tNum = metaScan.getMember("tracks").getSize(); + for (int i = 0; i < tNum; i++){ + DTSC::Scan trk = metaScan.getMember("tracks").getIndice(i); + size_t trackID = trk.getMember("trackid").asInt(); + if (meta.trackIDToIndex(trackID, getpid()) == INVALID_TRACK_ID){ + MEDIUM_MSG("Adding track: %s", trk.asJSON().toString().c_str()); + meta.addTrackFrom(trk); + }else{ + HIGH_MSG("Already had track: %s", trk.asJSON().toString().c_str()); + } + } + meta.refresh(); std::stringstream rep; - rep << "DTSC_HEAD received with " << M.getValidTracks().size() << " tracks. Bring on those data packets!"; + rep << "DTSC_HEAD parsed, we went from " << prevTracks << " to " << meta.getValidTracks().size() << " tracks. Bring on those data packets!"; sendOk(rep.str()); }else if (myConn.Received().copy(4) == "DTP2"){ if (!isPushing()){ @@ -207,11 +271,19 @@ namespace Mist{ if (!myConn.Received().available(8 + rSize)){return;}// abort - not enough data yet std::string dataPacket = myConn.Received().remove(8 + rSize); DTSC::Packet inPack(dataPacket.data(), dataPacket.size(), true); - if (M.trackIDToIndex(inPack.getTrackId(), getpid()) == INVALID_TRACK_ID){ - onFail("DTSC_V2 received for a track that was not announced in the DTSC_HEAD!", true); + size_t tid = M.trackIDToIndex(inPack.getTrackId(), getpid()); + if (tid == INVALID_TRACK_ID){ + //WARN_MSG("Received data for unknown track: %zu", inPack.getTrackId()); + onFail("DTSC_V2 received for a track that was not announced in a header!", true); return; } - bufferLivePacket(inPack); + if (!userSelect.count(tid)){ + userSelect[tid].reload(streamName, tid, COMM_STATUS_SOURCE); + } + char *data; + size_t dataLen; + inPack.getString("data", data, dataLen); + bufferLivePacket(inPack.getTime(), inPack.getInt("offset"), tid, data, dataLen, inPack.getInt("bpos"), inPack.getFlag("keyframe")); }else{ // Invalid onFail("Invalid packet header received. Aborting.", true); diff --git a/src/output/output_dtsc.h b/src/output/output_dtsc.h index 56b8b5ff..84920b9c 100644 --- a/src/output/output_dtsc.h +++ b/src/output/output_dtsc.h @@ -1,4 +1,5 @@ #include "output.h" +#include namespace Mist{ @@ -11,6 +12,7 @@ namespace Mist{ void sendNext(); void sendHeader(); void initialSeek(); + static bool listenMode(){return !(config->getString("target").size());} void onFail(const std::string &msg, bool critical = false); void stats(bool force = false); void sendCmd(const JSON::Value &data); @@ -20,6 +22,7 @@ namespace Mist{ unsigned int lastActive; ///< Time of last sending of data. std::string getStatsName(); std::string salt; + HTTP::URL pushUrl; void handlePush(DTSC::Scan &dScan); void handlePlay(DTSC::Scan &dScan); };