Various metadata-related features and improvements:

- Added support for new "NowMs" field that holds up to where no new packets are guaranteed to show up, in order to lower latency.
- Added support for JSON tracks over all TS-based protocols (input and output)
- Added support for AMF metadata conversion to JSON (RTMP/FLV input)
- Fixed MP4 input subtitle tracks
- Generalized websocket-based outputs to all support the same commands and run the same core logic
- Added new "JSONLine" protocol that allows for generic direct line-by-line ingest of subtitles and/or JSON metadata tracks over a TCP socket or console standard input.
This commit is contained in:
Thulinma 2022-11-09 10:35:07 +01:00
parent c337fff614
commit 3e2a17ff93
36 changed files with 1054 additions and 469 deletions

View file

@ -21,6 +21,7 @@ outputs = [
{'name' : 'WAV', 'format' : 'wav', 'extra': ['http']},
{'name' : 'SDP', 'format' : 'sdp', 'extra': ['http']},
{'name' : 'HTTP', 'format' : 'http_internal', 'extra': ['http','embed']},
{'name' : 'JSONLine', 'format' : 'jsonline'},
]
if usessl

View file

@ -94,7 +94,8 @@ namespace Mist{
dataWaitTimeout = 2500;
pushing = false;
recursingSync = false;
firstTime = 0;
firstTime = Util::bootMS();
thisTime = 0;
firstPacketTime = 0xFFFFFFFFFFFFFFFFull;
lastPacketTime = 0;
tkn = "";
@ -247,7 +248,7 @@ namespace Mist{
size_t mainTrack = getMainSelectedTrack();
if (mainTrack != INVALID_TRACK_ID){
DTSC::Keys keys(M.keys(mainTrack));
if (keys.getValidCount() >= minTracks || M.getLastms(mainTrack) - M.getFirstms(mainTrack) > minMs){
if (keys.getValidCount() >= minTracks || M.getNowms(mainTrack) - M.getFirstms(mainTrack) > minMs){
return true;
}
HIGH_MSG("NOT READY YET (%zu tracks, main track: %zu, with %zu keys)",
@ -393,7 +394,7 @@ namespace Mist{
}
bool autoSeek = buffer.size();
uint64_t seekTarget = buffer.getSyncMode()?currentTime():0;
uint64_t seekTarget = buffer.getSyncMode()?thisTime:0;
std::set<size_t> newSelects =
Util::wouldSelect(M, targetParams, capa, UA, autoSeek ? seekTarget : 0);
@ -401,7 +402,7 @@ namespace Mist{
std::set<size_t> toRemove;
for (std::set<size_t>::iterator it = newSelects.begin(); it != newSelects.end(); it++){
// autoSeeking and target not in bounds? Drop it too.
if (M.getLastms(*it) < std::max(seekTarget, (uint64_t)6000lu) - 6000){
if (M.getNowms(*it) < std::max(seekTarget, (uint64_t)6000lu) - 6000){
toRemove.insert(*it);
}
}
@ -493,7 +494,7 @@ namespace Mist{
//Abort if there are no keys
if (!keys.getValidCount()){return 0;}
//Get the key for the current time
size_t keyNum = M.getKeyNumForTime(trk, lastPacketTime);
size_t keyNum = M.getKeyNumForTime(trk, thisTime);
if (keyNum == INVALID_KEY_NUM){return 0;}
if (keys.getEndValid() <= keyNum+1){return 0;}
//Return the next key
@ -720,6 +721,13 @@ namespace Mist{
return buffer.begin()->time;
}
/// Return the intended target current time of the media buffer (as opposed to actual)
/// This takes into account the current playback speed as well as the maxSkipAhead setting.
uint64_t Output::targetTime(){
if (!realTime){return currentTime();}
return (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead);
}
/// Return the start time of the selected tracks.
/// Returns the start time of earliest track if nothing is selected.
/// Returns zero if no tracks exist.
@ -836,16 +844,16 @@ namespace Mist{
}
HIGH_MSG("Seeking for pos %" PRIu64, pos);
if (meta.getLive() && meta.getLastms(tid) < pos){
if (meta.getLive() && meta.getNowms(tid) < pos){
unsigned int maxTime = 0;
while (meta.getLastms(tid) < pos && myConn && ++maxTime <= 20 && keepGoing()){
while (meta.getNowms(tid) < pos && myConn && ++maxTime <= 20 && keepGoing()){
Util::wait(500);
stats();
}
}
if (meta.getLastms(tid) < pos){
if (meta.getNowms(tid) < pos){
WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: past end of track (= %" PRIu64 "ms).",
pos, tid, meta.getLastms(tid));
pos, tid, meta.getNowms(tid));
userSelect.erase(tid);
return false;
}
@ -884,46 +892,24 @@ namespace Mist{
tmpPack.reInit(curPage[tid].mapped + tmp.offset, 0, true);
tmp.time = tmpPack.getTime();
char *mpd = curPage[tid].mapped;
uint64_t nowMs = M.getNowms(tid);
while (tmp.time < pos && tmpPack){
tmp.offset += tmpPack.getDataLen();
tmpPack.reInit(mpd + tmp.offset, 0, true);
tmp.time = tmpPack.getTime();
}
if (tmpPack){
tmp.ghostPacket = false;
HIGH_MSG("Sought to time %" PRIu64 " in %s", tmp.time, curPage[tid].name.c_str());
tmp.partIndex = M.getPartIndex(tmpPack.getTime(), tmp.tid);
buffer.insert(tmp);
return true;
}
// don't print anything for empty packets - not sign of corruption, just unfinished stream.
if (curPage[tid].mapped[tmp.offset] != 0){
//There's a chance the packet header was written in between this check and the previous.
//Let's check one more time before aborting
tmpPack.reInit(mpd + tmp.offset, 0, true);
tmp.time = tmpPack.getTime();
if (tmpPack){
HIGH_MSG("Sought to time %" PRIu64 " in %s", tmp.time, curPage[tid].name.c_str());
tmp.partIndex = M.getPartIndex(tmpPack.getTime(), tmp.tid);
buffer.insert(tmp);
return true;
}
FAIL_MSG("Noes! Couldn't find packet on track %zu because of some kind of corruption error "
"or somesuch.",
tid);
return false;
}
VERYHIGH_MSG("Track %zu no data (key %" PRIu32 " @ %" PRIu64 ") - waiting...", tid,
keyNum + (getNextKey ? 1 : 0), tmp.offset);
uint32_t i = 0;
while (meta.getVod() && curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){
Util::wait(100 * i);
stats();
}
if (curPage[tid].mapped[tmp.offset]){return seek(tid, pos, getNextKey);}
FAIL_MSG("Track %zu no data (key %" PRIu32 "@%" PRIu64 ", page %s, time %" PRIu64 " -> %" PRIu64 ", next=%" PRIu64 ") - timeout", tid, keyNum + (getNextKey ? 1 : 0), tmp.offset, curPage[tid].name.c_str(), pos, actualKeyTime, keys.getTime(keyNum+1));
userSelect.erase(tid);
firstTime = Util::bootMS() - (buffer.begin()->time * realTime / 1000);
return false;
tmp.partIndex = M.getPartIndex(nowMs, tmp.tid);
tmp.ghostPacket = true;
tmp.time = nowMs;
buffer.insert(tmp);
return true;
}
/// This function decides where in the stream initial playback starts.
@ -948,7 +934,7 @@ namespace Mist{
bool good = true;
// check if all tracks have data for this point in time
for (std::map<size_t, Comms::Users>::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){
if (meta.getLastms(ti->first) < seekPos + needsLookAhead){
if (meta.getNowms(ti->first) < seekPos + needsLookAhead){
good = false;
break;
}
@ -957,15 +943,15 @@ namespace Mist{
HIGH_MSG("Skipping track %zu, not in tracks", ti->first);
continue;
}// ignore missing tracks
if (M.getLastms(ti->first) < seekPos + needsLookAhead + M.getMinKeepAway(ti->first)){
if (M.getNowms(ti->first) < seekPos + needsLookAhead + M.getMinKeepAway(ti->first)){
good = false;
break;
}
if (meta.getLastms(ti->first) == M.getFirstms(ti->first)){
if (meta.getNowms(ti->first) == M.getFirstms(ti->first)){
HIGH_MSG("Skipping track %zu, last equals first", ti->first);
continue;
}// ignore point-tracks
if (meta.getLastms(ti->first) < seekPos){
if (meta.getNowms(ti->first) < seekPos){
good = false;
break;
}
@ -1099,7 +1085,7 @@ namespace Mist{
if (M.getLive() && (targetParams.count("startunix") || targetParams.count("stopunix"))){
uint64_t unixStreamBegin = Util::epoch() - endTime()/1000;
size_t mainTrack = getMainSelectedTrack();
int64_t streamAvail = M.getLastms(mainTrack);
int64_t streamAvail = M.getNowms(mainTrack);
if (targetParams.count("startunix")){
int64_t startUnix = atoll(targetParams["startunix"].c_str());
if (startUnix < 0){
@ -1116,8 +1102,7 @@ namespace Mist{
}
}
if (startUnix < unixStreamBegin){
WARN_MSG("Start time is earlier than stream begin - starting earliest possible");
WARN_MSG("%" PRId64 " < %" PRId64, startUnix, unixStreamBegin);
WARN_MSG("Start time (%" PRId64 ") is earlier than stream begin (%" PRId64 ") - starting earliest possible", startUnix, unixStreamBegin);
targetParams["start"] = "-1";
}else{
targetParams["start"] = JSON::Value((startUnix - unixStreamBegin) * 1000).asString();
@ -1145,20 +1130,20 @@ namespace Mist{
if (targetParams.count("start") && atoll(targetParams["start"].c_str()) != 0){
size_t mainTrack = getMainSelectedTrack();
int64_t startRec = atoll(targetParams["start"].c_str());
if (startRec > M.getLastms(mainTrack)){
if (startRec > M.getNowms(mainTrack)){
if (!M.getLive()){
onFail("Playback start past end of non-live source", true);
return;
}
int64_t streamAvail = M.getLastms(mainTrack);
int64_t streamAvail = M.getNowms(mainTrack);
int64_t lastUpdated = Util::getMS();
INFO_MSG("Waiting for stream to reach playback starting point. Current last ms is '%" PRIu64 "'", streamAvail);
while (Util::getMS() - lastUpdated < 5000 && startRec > streamAvail && keepGoing()){
Util::sleep(500);
if (M.getLastms(mainTrack) > streamAvail){
if (M.getNowms(mainTrack) > streamAvail){
HIGH_MSG("Waiting for stream to reach playback starting point. Current last ms is '%" PRIu64 "'", streamAvail);
stats();
streamAvail = M.getLastms(mainTrack);
streamAvail = M.getNowms(mainTrack);
lastUpdated = Util::getMS();
}
}
@ -1309,12 +1294,12 @@ namespace Mist{
bool Output::reachedPlannedStop(){
// If we're recording to file and reached the target position, stop
if (isRecordingToFile && targetParams.count("recstop") &&
atoll(targetParams["recstop"].c_str()) <= lastPacketTime){
atoll(targetParams["recstop"].c_str()) <= thisTime){
INFO_MSG("End of planned recording reached");
return true;
}
// Regardless of playback method, if we've reached the wanted stop point, stop
if (targetParams.count("stop") && atoll(targetParams["stop"].c_str()) <= lastPacketTime){
if (targetParams.count("stop") && atoll(targetParams["stop"].c_str()) <= thisTime){
INFO_MSG("End of planned playback reached");
return true;
}
@ -1331,7 +1316,7 @@ namespace Mist{
return false;
}
// is this a split point?
if (targetParams.count("nxt-split") && atoll(targetParams["nxt-split"].c_str()) <= lastPacketTime){
if (targetParams.count("nxt-split") && atoll(targetParams["nxt-split"].c_str()) <= thisTime){
INFO_MSG("Split point reached");
return true;
}
@ -1610,10 +1595,11 @@ namespace Mist{
// slow down processing, if real time speed is wanted
if (realTime && buffer.getSyncMode()){
uint8_t i = 6;
while (--i && thisPacket.getTime() > (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead) &&
uint64_t amount = thisTime - targetTime();
size_t i = (amount / 1000) + 6;
while (--i && thisTime > targetTime() &&
keepGoing()){
uint64_t amount = thisPacket.getTime() - (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead);
amount = thisTime - targetTime();
if (amount > 1000){amount = 1000;}
idleTime(amount);
//Make sure we stay responsive to requests and stats while waiting
@ -1633,23 +1619,23 @@ namespace Mist{
// wait at most double the look ahead time, plus ten seconds
uint64_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000 / sleepTime);
uint64_t needsTime = thisTime + needsLookAhead;
bool firstTime = true;
bool firstLookahead = true;
while (--timeoutTries && keepGoing()){
bool lookReady = true;
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin();
it != userSelect.end(); it++){
if (meta.getLastms(it->first) <= needsTime){
if (meta.getNowms(it->first) <= needsTime){
if (timeoutTries == 1){
WARN_MSG("Track %zu: %" PRIu64 " <= %" PRIu64, it->first,
meta.getLastms(it->first), needsTime);
meta.getNowms(it->first), needsTime);
}
lookReady = false;
break;
}
}
if (lookReady){break;}
if (firstTime){
firstTime = false;
if (firstLookahead){
firstLookahead = false;
}else{
playbackSleep(sleepTime);
}
@ -1760,7 +1746,7 @@ namespace Mist{
onFinish();
break;
}
uint64_t endRec = lastPacketTime + atoll(targetParams["split"].c_str()) * 1000;
uint64_t endRec = thisTime + atoll(targetParams["split"].c_str()) * 1000;
targetParams["nxt-split"] = JSON::Value(endRec).asString();
sentHeader = false;
sendHeader();
@ -1987,24 +1973,34 @@ namespace Mist{
return false;
}
// if we're going to read past the end of the data page, load the next page
// this only happens for VoD
// if we're going to read past the end of the data page...
if (nxt.offset >= curPage[nxt.tid].len ||
(!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){
if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){
// For non-live, we may have just reached the end of the track. That's normal and fine, drop it.
if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){
dropTrack(nxt.tid, "end of VoD track reached", false);
return false;
}
// Check if there is a next page for the timestamp we're looking for.
if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){
loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time));
nxt.offset = 0;
//Only read the next time if the page load succeeded and there is a packet to read from
if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){
nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0);
nxt.ghostPacket = false;
}else{
nxt.ghostPacket = true;
}
buffer.replaceFirst(nxt);
return false;
}
// We're still on the same page; ghost packets should update their time and retry later
if (nxt.ghostPacket){
nxt.time = M.getNowms(nxt.tid);
buffer.replaceFirst(nxt);
return false;
}
if (nxt.offset >= curPage[nxt.tid].len){
INFO_MSG("Reading past end of page %s: %" PRIu64 " > %" PRIu64 " for time %" PRIu64 " on track %zu", curPage[nxt.tid].name.c_str(), nxt.offset, curPage[nxt.tid].len, nxt.time, nxt.tid);
dropTrack(nxt.tid, "reading past end of page");
@ -2016,6 +2012,7 @@ namespace Mist{
}
// We know this packet will be valid, pre-load it so we know its length
DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true);
nxt.time = preLoad.getTime();
nextTime = 0;
@ -2042,6 +2039,7 @@ namespace Mist{
if (!M.getLive() && nxt.time >= M.getLastms(nxt.tid)){
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
thisIdx = nxt.tid;
thisTime = nxt.time;
dropTrack(nxt.tid, "end of non-live track reached", false);
return true;
}
@ -2068,6 +2066,9 @@ namespace Mist{
break;//Valid packet!
}
// Force valid packet if nowMs is higher than current packet time
if (M.getNowms(nxt.tid) > nxt.time){break;}
//Okay, there's no next page yet, and no next packet on this page either.
//That means we're waiting for data to show up, somewhere.
@ -2118,7 +2119,7 @@ namespace Mist{
}
emptyCount = 0; // valid packet - reset empty counter
thisIdx = nxt.tid;
thisTime = thisPacket.getTime();
thisTime = nxt.time;
if (!userSelect[nxt.tid]){
dropTrack(nxt.tid, "track is not alive!");
@ -2134,7 +2135,14 @@ namespace Mist{
// we assume the next packet is the next on this same page
nxt.offset += thisPacket.getDataLen();
nxt.time = nextTime;
if (!nextTime){
// If time is not known yet, insert a ghostPacket with a known safe time
nxt.time = M.getNowms(nxt.tid);
nxt.ghostPacket = true;
}else{
nxt.time = nextTime;
nxt.ghostPacket = false;
}
++nxt.partIndex;
// exchange the current packet in the buffer for the next one
@ -2425,11 +2433,11 @@ namespace Mist{
uint64_t oneTime = 0;
uint64_t twoTime = 0;
for (std::set<size_t>::iterator it = vTracks.begin(); it != vTracks.end(); ++it){
if (M.getLastms(*it) > oneTime){oneTime = M.getLastms(*it);}
if (M.getNowms(*it) > oneTime){oneTime = M.getNowms(*it);}
}
Util::wait(2000);
for (std::set<size_t>::iterator it = vTracks.begin(); it != vTracks.end(); ++it){
if (M.getLastms(*it) > twoTime){twoTime = M.getLastms(*it);}
if (M.getNowms(*it) > twoTime){twoTime = M.getNowms(*it);}
}
if (twoTime <= oneTime+500){
disconnect();

View file

@ -42,6 +42,7 @@ namespace Mist{
uint64_t currentTime();
uint64_t startTime();
uint64_t endTime();
uint64_t targetTime();
void setBlocking(bool blocking);
bool selectDefaultTracks();
bool connectToFile(std::string file, bool append = false, Socket::Connection *conn = 0);

View file

@ -10,7 +10,15 @@
namespace Mist{
HTTPOutput::HTTPOutput(Socket::Connection &conn) : Output(conn){
//Websocket related
webSock = 0;
wsCmds = false;
stayLive = true;
target_rate = 0.0;
forwardTo = 0;
prevVidTrack = INVALID_TRACK_ID;
//General
idleInterval = 0;
idleLast = 0;
if (config->getString("ip").size()){myConn.setHost(config->getString("ip"));}
@ -179,16 +187,84 @@ namespace Mist{
return "";
}
bool HTTPOutput::onFinish(){
//If we're in the middle of sending a chunked reply, finish it cleanly and get read for the next request
if (!webSock && H.sendingChunks){
H.Chunkify(0, 0, myConn);
wantRequest = true;
return true;
}
//If we're a websocket and handling commands, finish it cleanly too
if (webSock && wsCmds){
JSON::Value r;
r["type"] = "on_stop";
r["data"]["current"] = currentTime();
r["data"]["begin"] = Output::startTime();
r["data"]["end"] = Output::endTime();
webSock->sendFrame(r.toString());
parseData = false;
return false;
}
//All other cases call the parent finish handler
return Output::onFinish();
}
void HTTPOutput::sendNext(){
//If we're not in websocket mode and handling commands, we do nothing here
if (!wsCmds || !webSock){return;}
//Finish fast-forwarding if forwardTo time was reached
if (forwardTo && thisTime >= forwardTo){
forwardTo = 0;
if (target_rate == 0.0){
realTime = 1000;//set playback speed to default
firstTime = Util::bootMS() - thisTime;
maxSkipAhead = 0;//enable automatic rate control
}else{
stayLive = false;
//Set new realTime speed
realTime = 1000 / target_rate;
firstTime = Util::bootMS() - (thisTime / target_rate);
maxSkipAhead = 1;//disable automatic rate control
}
JSON::Value r;
r["type"] = "set_speed";
r["data"]["play_rate_prev"] = "fast-forward";
if (target_rate == 0.0){
r["data"]["play_rate_curr"] = "auto";
}else{
r["data"]["play_rate_curr"] = target_rate;
}
webSock->sendFrame(r.toString());
}
// Handle nice move-over to new main video track
if (prevVidTrack != INVALID_TRACK_ID && thisIdx != prevVidTrack && M.getType(thisIdx) == "video"){
if (!thisPacket.getFlag("keyframe")){
// Ignore the packet if not a keyframe
return;
}
dropTrack(prevVidTrack, "Smoothly switching to new video track", false);
prevVidTrack = INVALID_TRACK_ID;
handleWebsocketIdle();
onIdle();
sendHeader();
}
}
void HTTPOutput::requestHandler(){
// Handle onIdle function caller, if needed
if (idleInterval && (Util::bootMS() > idleLast + idleInterval)){
if (wsCmds){handleWebsocketIdle();}
onIdle();
idleLast = Util::bootMS();
}
// Handle websockets
if (webSock){
if (webSock->readFrame()){
onWebsocketFrame();
if (!wsCmds || !handleWebsocketCommands()){
onWebsocketFrame();
}
idleLast = Util::bootMS();
return;
}
@ -270,6 +346,7 @@ namespace Mist{
if (H.GetVar("audio") != ""){targetParams["audio"] = H.GetVar("audio");}
if (H.GetVar("video") != ""){targetParams["video"] = H.GetVar("video");}
if (H.GetVar("meta") != ""){targetParams["meta"] = H.GetVar("meta");}
if (H.GetVar("subtitle") != ""){targetParams["subtitle"] = H.GetVar("subtitle");}
if (H.GetVar("start") != ""){targetParams["start"] = H.GetVar("start");}
if (H.GetVar("stop") != ""){targetParams["stop"] = H.GetVar("stop");}
@ -314,6 +391,13 @@ namespace Mist{
webSock = 0;
return;
}
//Generic websocket handling sets idle interval to 1s and changes name by appending "/WS"
if (wsCmds){
idleInterval = 1000;
if (capa["name"].asStringRef().find("/WS") != std::string::npos){
capa["name"] = capa["name"].asStringRef() + "/WS";
}
}
onWebsocketConnect();
H.Clean();
return;
@ -331,6 +415,361 @@ namespace Mist{
if (!sawRequest && !myConn.spool() && !isBlocking && !parseData){Util::sleep(100);}
}
/// Handles standardized WebSocket commands.
/// Returns true if a command was executed, false otherwise.
bool HTTPOutput::handleWebsocketCommands(){
//only handle text frames
if (webSock->frameType != 1){return false;}
//Parse JSON and check command type
JSON::Value command = JSON::fromString(webSock->data, webSock->data.size());
if (!command || !command.isMember("type")){return false;}
//Seek command, for changing playback position
if (command["type"] == "seek") {
handleWebsocketSeek(command);
return true;
}
//Pause command, toggles pause state
if (command["type"] == "pause") {
parseData = !parseData;
JSON::Value r;
r["type"] = "pause";
r["paused"] = !parseData;
if (!parseData){
//Store current target time into lastPacketTime when pausing
lastPacketTime = targetTime();
}else{
//On resume, restore the timing to be where it was when pausing
firstTime = Util::bootMS() - (lastPacketTime / target_rate);
}
webSock->sendFrame(r.toString());
return true;
}
//Hold command, forces pause state on
if (command["type"] == "hold") {
if (parseData){
//Store current target time into lastPacketTime when pausing
lastPacketTime = targetTime();
}
parseData = false;
webSock->sendFrame("{\"type\":\"pause\",\"paused\":true}");
return true;
}
//Tracks command, for (re)selecting tracks
if (command["type"] == "tracks") {
if (command.isMember("audio")){
if (!command["audio"].isNull() && command["audio"] != "auto"){
targetParams["audio"] = command["audio"].asString();
}else{
targetParams.erase("audio");
}
}
if (command.isMember("video")){
if (!command["video"].isNull() && command["video"] != "auto"){
targetParams["video"] = command["video"].asString();
}else{
targetParams.erase("video");
}
}
if (command.isMember("meta")){
if (!command["meta"].isNull() && command["meta"] != "auto"){
targetParams["meta"] = command["meta"].asString();
}else{
targetParams.erase("meta");
}
}
if (command.isMember("seek_time")){
possiblyReselectTracks(command["seek_time"].asInt());
}else{
possiblyReselectTracks(currentTime());
}
return true;
}
//Fast_forward command, fast-forwards to given timestamp and resume previous speed
if (command["type"] == "fast_forward"){
if (command.isMember("ff_to")){
forwardTo = command["ff_to"].asInt();
if (forwardTo > currentTime()){
realTime = 0;
}else{
if (target_rate == 0.0){
firstTime = Util::bootMS() - forwardTo;
}else{
firstTime = Util::bootMS() - (forwardTo / target_rate);
}
forwardTo = 0;
}
}else{
JSON::Value r;
r["type"] = "warning";
r["warning"] = "Ignored fast_forward command: ff_to property missing";
webSock->sendFrame(r.toString());
}
onIdle();
return true;
}
//Set_speed command, changes playback speed
if (command["type"] == "set_speed") {
JSON::Value r;
r["type"] = "set_speed";
if (!command.isMember("play_rate")){
r["error"] = "play_rate missing";
webSock->sendFrame(r.toString());
return false;
}
double set_rate = command["play_rate"].asDouble();
if (!parseData){
parseData = true;
selectDefaultTracks();
}
if (target_rate == 0.0){
r["data"]["play_rate_prev"] = "auto";
}else{
r["data"]["play_rate_prev"] = target_rate;
}
if (set_rate == 0.0){
r["data"]["play_rate_curr"] = "auto";
}else{
r["data"]["play_rate_curr"] = set_rate;
}
if (target_rate != set_rate){
uint64_t prevTargetTime = targetTime();
target_rate = set_rate;
if (target_rate == 0.0){
realTime = 1000;//set playback speed to default
firstTime = Util::bootMS() - prevTargetTime;
maxSkipAhead = 0;//enabled automatic rate control
}else{
stayLive = false;
//Set new realTime speed
realTime = 1000 / target_rate;
firstTime = Util::bootMS() - (prevTargetTime / target_rate);
maxSkipAhead = 1;//disable automatic rate control
}
}
if (M.getLive()){r["data"]["live_point"] = stayLive;}
webSock->sendFrame(r.toString());
handleWebsocketIdle();
onIdle();
return true;
}
//Stop command, ends playback and disconnects the socket explicitly
if (command["type"] == "stop") {
Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "User requested stop");
myConn.close();
return true;
}
//Play command, sets pause state off and optionally also seeks
if (command["type"] == "play") {
parseData = true;
if (command.isMember("seek_time")){
handleWebsocketSeek(command);
}else{
if (!currentTime()){
command["seek_time"] = 0;
handleWebsocketSeek(command);
}else{
parseData = true;
selectDefaultTracks();
firstTime = Util::bootMS() - (lastPacketTime / target_rate);
}
}
return true;
}
//Unhandled commands end up here
return false;
}
void HTTPOutput::handleWebsocketIdle(){
if (!webSock){return;}
if (!parseData){return;}
//Finish fast-forwarding if forwardTo time was reached
if (forwardTo && targetTime() >= forwardTo){
forwardTo = 0;
if (target_rate == 0.0){
realTime = 1000;//set playback speed to default
firstTime = Util::bootMS() - targetTime();
maxSkipAhead = 0;//enable automatic rate control
}else{
stayLive = false;
//Set new realTime speed
realTime = 1000 / target_rate;
firstTime = Util::bootMS() - (targetTime() / target_rate);
maxSkipAhead = 1;//disable automatic rate control
}
JSON::Value r;
r["type"] = "set_speed";
r["data"]["play_rate_prev"] = "fast-forward";
if (target_rate == 0.0){
r["data"]["play_rate_curr"] = "auto";
}else{
r["data"]["play_rate_curr"] = target_rate;
}
webSock->sendFrame(r.toString());
}
JSON::Value r;
r["type"] = "on_time";
r["data"]["current"] = targetTime();
r["data"]["next"] = currentTime();
r["data"]["begin"] = Output::startTime();
r["data"]["end"] = Output::endTime();
if (realTime == 0){
r["data"]["play_rate_curr"] = "fast-forward";
}else{
if (target_rate == 0.0){
r["data"]["play_rate_curr"] = "auto";
}else{
r["data"]["play_rate_curr"] = target_rate;
}
}
uint64_t jitter = 0;
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
r["data"]["tracks"].append((uint64_t)it->first);
if (jitter < M.getMinKeepAway(it->first)){jitter = M.getMinKeepAway(it->first);}
}
r["data"]["jitter"] = jitter;
if (M.getLive() && dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;}
if (capa.isMember("maxdelay") && capa["maxdelay"].asInt() < jitter*1.5){capa["maxdelay"] = jitter*1.5;}
webSock->sendFrame(r.toString());
}
bool HTTPOutput::possiblyReselectTracks(uint64_t seekTarget){
// Remember the previous video track, if any.
std::set<size_t> prevSelTracks;
prevVidTrack = INVALID_TRACK_ID;
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
prevSelTracks.insert(it->first);
if (M.getType(it->first) == "video"){
prevVidTrack = it->first;
}
}
if (!selectDefaultTracks()) {
prevVidTrack = INVALID_TRACK_ID;
handleWebsocketIdle();
onIdle();
return false;
}
if (seekTarget != currentTime()){prevVidTrack = INVALID_TRACK_ID;}
bool hasVideo = false;
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (M.getType(it->first) == "video"){hasVideo = true;}
}
// Add the previous video track back, if we had one.
if (prevVidTrack != INVALID_TRACK_ID && !userSelect.count(prevVidTrack) && hasVideo){
userSelect[prevVidTrack].reload(streamName, prevVidTrack);
seek(seekTarget);
std::set<size_t> newSelTracks;
newSelTracks.insert(prevVidTrack);
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (M.getType(it->first) != "video"){
newSelTracks.insert(it->first);
}
}
if (prevSelTracks != newSelTracks){
seek(seekTarget, true);
realTime = 0;
forwardTo = seekTarget;
sendHeader();
JSON::Value r;
r["type"] = "set_speed";
if (target_rate == 0.0){
r["data"]["play_rate_prev"] = "auto";
}else{
r["data"]["play_rate_prev"] = target_rate;
}
r["data"]["play_rate_curr"] = "fast-forward";
webSock->sendFrame(r.toString());
}
}else{
prevVidTrack = INVALID_TRACK_ID;
seek(seekTarget, true);
realTime = 0;
forwardTo = seekTarget;
sendHeader();
JSON::Value r;
r["type"] = "set_speed";
if (target_rate == 0.0){
r["data"]["play_rate_prev"] = "auto";
}else{
r["data"]["play_rate_prev"] = target_rate;
}
r["data"]["play_rate_curr"] = "fast-forward";
webSock->sendFrame(r.toString());
}
handleWebsocketIdle();
onIdle();
return true;
}
bool HTTPOutput::handleWebsocketSeek(const JSON::Value& command) {
JSON::Value r;
r["type"] = "seek";
if (!command.isMember("seek_time")){
r["error"] = "seek_time missing";
webSock->sendFrame(r.toString());
return false;
}
uint64_t seek_time = command["seek_time"].asInt();
if (!parseData){
parseData = true;
selectDefaultTracks();
}
stayLive = (target_rate == 0.0) && (Output::endTime() < seek_time + 5000);
if (command["seek_time"].asStringRef() == "live"){stayLive = true;}
if (stayLive){seek_time = Output::endTime();}
if (!seek(seek_time, true)) {
r["error"] = "seek failed, continuing as-is";
webSock->sendFrame(r.toString());
return false;
}
if (M.getLive()){r["data"]["live_point"] = stayLive;}
if (target_rate == 0.0){
r["data"]["play_rate_curr"] = "auto";
}else{
r["data"]["play_rate_curr"] = target_rate;
}
if (command.isMember("ff_to") || (seek_time >= 250 && currentTime() < seek_time - 250)){
forwardTo = seek_time;
if (command.isMember("ff_to") && command["ff_to"].asInt() > forwardTo){
forwardTo = command["ff_to"].asInt();
}
if (forwardTo < currentTime()){
if (target_rate == 0.0){
firstTime = Util::bootMS() - forwardTo;
}else{
firstTime = Util::bootMS() - (forwardTo / target_rate);
}
forwardTo = 0;
}else{
realTime = 0;
r["data"]["play_rate_curr"] = "fast-forward";
}
}
handleWebsocketIdle();
onIdle();
webSock->sendFrame(r.toString());
return true;
}
/// Default HTTP handler.
/// Only takes care of OPTIONS and HEAD, saving the original request, and calling respondHTTP
void HTTPOutput::onHTTP(){

View file

@ -15,18 +15,32 @@ namespace Mist{
virtual void onHTTP();
virtual void respondHTTP(const HTTP::Parser & req, bool headersOnly);
virtual void onIdle(){};
virtual void onWebsocketFrame(){};
virtual void onWebsocketConnect(){};
virtual void preWebsocketConnect(){};
virtual void requestHandler();
virtual void preHTTP();
virtual bool onFinish();
virtual void sendNext();
static bool listenMode(){return false;}
virtual bool doesWebsockets(){return false;}
void reConnector(std::string &connector);
std::string getHandler();
bool parseRange(std::string header, uint64_t &byteStart, uint64_t &byteEnd);
//WebSocket related
virtual bool doesWebsockets(){return false;}
virtual void onWebsocketFrame(){};
virtual void onWebsocketConnect(){};
virtual void preWebsocketConnect(){};
bool handleWebsocketCommands();
void handleWebsocketIdle();
bool handleWebsocketSeek(const JSON::Value & command);
bool possiblyReselectTracks(uint64_t seekTarget);
protected:
//WebSocket related
bool wsCmds; ///< If true, implements all our standard websocket-based seek/play/etc commands
double target_rate; ///< Target playback speed rate (1.0 = normal, 0 = auto)
uint64_t forwardTo; ///< Playback position we're fast-forwarding towards
size_t prevVidTrack; ///< Previously selected main video track
bool stayLive; ///< Whether or not we're trying to stay on the live-most point, for live streams
bool responded;
HTTP::Parser H;
HTTP::Websocket *webSock;

View file

@ -9,6 +9,8 @@
#include <mist/websocket.h>
#include <sys/stat.h>
bool includeZeroMatches = false;
namespace Mist{
/// Helper function to find the protocol entry for a given port number
std::string getProtocolForPort(uint16_t portNo){
@ -647,7 +649,7 @@ namespace Mist{
// loop over the added sources, add them to json_resp["sources"]
for (std::set<JSON::Value, sourceCompare>::iterator it = sources.begin(); it != sources.end(); it++){
if ((*it)["simul_tracks"].asInt() > 0){
if (includeZeroMatches || (*it)["simul_tracks"].asInt() > 0){
if (Comms::tknMode & 0x04){
JSON::Value tmp;
tmp = (*it);
@ -664,6 +666,7 @@ namespace Mist{
void OutHTTP::respondHTTP(const HTTP::Parser & req, bool headersOnly){
origStreamName = streamName;
includeZeroMatches = req.GetVar("inclzero").size();
if (req.GetHeader("X-Mst-Path").size()){mistPath = req.GetHeader("X-Mst-Path");}

View file

@ -74,6 +74,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("+AC3");
capa["codecs"][0u][1u].append("+MP2");
capa["codecs"][0u][1u].append("+opus");
capa["codecs"][0u][2u].append("+JSON");
capa["codecs"][1u][0u].append("rawts");
capa["methods"][0u]["handler"] = "http";
capa["methods"][0u]["type"] = "html5/video/mpeg";

View file

@ -5,6 +5,7 @@
namespace Mist{
OutJSON::OutJSON(Socket::Connection &conn) : HTTPOutput(conn){
wsCmds = true;
realTime = 0;
bootMsOffset = 0;
keepReselecting = false;
@ -19,7 +20,8 @@ namespace Mist{
capa["friendly"] = "JSON over HTTP";
capa["desc"] = "Pseudostreaming in JSON format over HTTP";
capa["url_match"] = "/$.json";
capa["codecs"][0u][0u].append("@+meta");
capa["codecs"][0u][0u].append("@meta");
capa["codecs"][0u][0u].append("subtitle");
capa["methods"][0u]["handler"] = "http";
capa["methods"][0u]["type"] = "html5/text/javascript";
capa["methods"][0u]["hrn"] = "JSON progressive";
@ -33,6 +35,9 @@ namespace Mist{
}
void OutJSON::sendNext(){
//Call parent handler for generic websocket handling
HTTPOutput::sendNext();
if (keepReselecting){
// If we can select more tracks, do it and continue.
if (selectDefaultTracks()){
@ -44,11 +49,31 @@ namespace Mist{
char *dPtr;
size_t dLen;
thisPacket.getString("data", dPtr, dLen);
if (dLen == 0 || (dLen == 1 && dPtr[0] == ' ')){return;}
jPack["data"] = JSON::fromString(dPtr, dLen);
jPack["time"] = thisPacket.getTime();
jPack["time"] = thisTime;
jPack["track"] = (uint64_t)thisIdx;
}else if (M.getCodec(thisIdx) == "subtitle"){
char *dPtr;
size_t dLen;
thisPacket.getString("data", dPtr, dLen);
//Ignore blank subtitles
if (dLen == 0 || (dLen == 1 && dPtr[0] == ' ')){return;}
//Get duration, or calculate if missing
uint64_t duration = thisPacket.getInt("duration");
if (!duration){duration = dLen * 75 + 800;}
//Build JSON data to transmit
jPack["duration"] = duration;
jPack["time"] = thisTime;
jPack["track"] = (uint64_t)thisIdx;
jPack["data"] = std::string(dPtr, dLen);
}else{
jPack = thisPacket.toJSON();
jPack.removeMember("bpos");
jPack["generic_converter_used"] = true;
}
if (dupcheck){
if (jPack.compareExcept(lastVal, nodup)){
@ -75,13 +100,14 @@ namespace Mist{
}
void OutJSON::sendHeader(){
sentHeader = true;
if (webSock){return;}
std::string method = H.method;
H.Clean();
H.SetHeader("Content-Type", "text/javascript");
H.protocol = "HTTP/1.0";
H.setCORSHeaders();
H.SendResponse("200", "OK", myConn);
sentHeader = true;
}
void OutJSON::onFail(const std::string &msg, bool critical){
@ -203,9 +229,11 @@ namespace Mist{
/// Repeats last JSON packet every 5 seconds to keep stream alive.
void OutJSON::onIdle(){
lastOutTime += (Util::bootMS() - lastSendTime);
lastSendTime = Util::bootMS();
bufferLivePacket(lastOutTime, 0, pushTrack, lastOutData.data(), lastOutData.size(), 0, true);
if (isPushing()){
lastOutTime += (Util::bootMS() - lastSendTime);
lastSendTime = Util::bootMS();
bufferLivePacket(lastOutTime, 0, pushTrack, lastOutData.data(), lastOutData.size(), 0, true);
}
}
void OutJSON::onHTTP(){

View file

@ -0,0 +1,112 @@
#include "output_jsonline.h"
#include <mist/defines.h>
#include <mist/http_parser.h>
#include <mist/url.h>
#include <mist/triggers.h>
#include <mist/stream.h>
namespace Mist{
OutJSONLine::OutJSONLine(Socket::Connection &conn) : Output(conn){
trkIdx = INVALID_TRACK_ID;
streamName = config->getString("streamname");
wantRequest = true;
parseData = false;
if (Triggers::shouldTrigger("PUSH_REWRITE")){
std::string payload = "jsonline://" + myConn.getBoundAddress() + ":" + config->getOption("port").asString() + "\n" + getConnectedHost() + "\n" + streamName;
std::string newStream = streamName;
Triggers::doTrigger("PUSH_REWRITE", payload, "", false, newStream);
if (!newStream.size()){
FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.c_str());
config->is_active = false;
return;
}else{
streamName = newStream;
Util::sanitizeName(streamName);
Util::setStreamName(streamName);
}
}
if (!allowPush("")){
FAIL_MSG("Pushing not allowed");
config->is_active = false;
return;
}
initialize();
trkIdx = meta.addTrack();
meta.setType(trkIdx, "meta");
meta.setCodec(trkIdx, config->getString("codec"));
meta.setID(trkIdx, 1);
offset = M.getBootMsOffset();
myConn.setBlocking(false);
INFO_MSG("%s track index is %zu", config->getString("codec").c_str(), trkIdx);
}
OutJSONLine::~OutJSONLine(){
if (trkIdx != INVALID_TRACK_ID && M){
meta.abandonTrack(trkIdx);
}
}
void OutJSONLine::init(Util::Config *cfg){
Output::init(cfg);
capa["name"] = "JSONLine";
capa["friendly"] = "JSON lines over raw TCP";
capa["desc"] = "Real time JSON line-by-line input over a raw TCP socket or standard input";
capa["deps"] = "";
capa["required"]["streamname"]["name"] = "Stream";
capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add "
"this protocol multiple times using different ports.";
capa["required"]["streamname"]["type"] = "str";
capa["required"]["streamname"]["option"] = "--stream";
capa["required"]["streamname"]["short"] = "s";
cfg->addOption("codec",
JSON::fromString("{\"arg\":\"string\",\"default\":\"JSON\",\"short\":\"c\",\"long\":"
"\"codec\",\"help\":\"Codec to use for data ingest, JSON by default\"}"));
capa["optional"]["codec"]["name"] = "Codec";
capa["optional"]["codec"]["help"] = "What codec to ingest as";
capa["optional"]["codec"]["default"] = "JSON";
capa["optional"]["codec"]["type"] = "str";
capa["optional"]["codec"]["option"] = "--codec";
capa["optional"]["codec"]["short"] = "c";
capa["codecs"][0u][0u].append("JSON");
cfg->addConnectorOptions(3456, capa);
config = cfg;
}
void OutJSONLine::sendNext(){
}
bool OutJSONLine::listenMode(){return true;}
void OutJSONLine::requestHandler(){
if (myConn.spool()){
while (myConn.Received().size()){
dPtr.append(myConn.Received().get());
myConn.Received().get().clear();
if (dPtr.size() && dPtr[dPtr.size() - 1] == '\n'){
thisTime = Util::bootMS() - offset;
thisIdx = trkIdx;
thisPacket.genericFill(thisTime, 0, 1, dPtr, dPtr.size(), 0, true);
bufferLivePacket(thisPacket);
dPtr.truncate(0);
}
}
}else{
meta.setNowms(trkIdx, Util::bootMS() - offset);
Util::sleep(10);
}
}
std::string OutJSONLine::getStatsName(){
if (!parseData){
return "INPUT:" + capa["name"].asStringRef();
}else{
return Output::getStatsName();
}
}
bool OutJSONLine::isReadyForPlay(){return true;}
}// namespace Mist

View file

@ -0,0 +1,26 @@
#include "output.h"
namespace Mist{
class OutJSONLine : public Output{
public:
OutJSONLine(Socket::Connection &conn);
~OutJSONLine();
static void init(Util::Config *cfg);
void sendNext();
static bool listenMode();
bool isReadyForPlay();
void requestHandler();
private:
std::string getStatsName();
Util::ResizeablePointer dPtr;
size_t trkIdx;
uint64_t offset;
protected:
inline virtual bool keepGoing(){
return config->is_active && (!listenMode() || myConn);
}
};
}// namespace Mist
typedef Mist::OutJSONLine mistOut;

View file

@ -106,14 +106,11 @@ namespace Mist{
}
OutMP4::OutMP4(Socket::Connection &conn) : HTTPOutput(conn){
prevVidTrack = INVALID_TRACK_ID;
wsCmds = true;
nextHeaderTime = 0xffffffffffffffffull;
startTime = 0;
endTime = 0xffffffffffffffffull;
realBaseOffset = 1;
stayLive = true;
target_rate = 0.0;
forwardTo = 0;
}
OutMP4::~OutMP4(){}
@ -1197,12 +1194,8 @@ namespace Mist{
}
void OutMP4::sendNext(){
if (!thisPacket.getData()) {
FAIL_MSG("`thisPacket.getData()` is invalid.");
return;
}
//Call parent handler for generic websocket handling
HTTPOutput::sendNext();
// Obtain a pointer to the data of this packet
char *dataPointer = 0;
size_t len = 0;
@ -1210,68 +1203,6 @@ namespace Mist{
// WebSockets send each packet directly. The packet is constructed in `appendSinglePacketMoof()`.
if (webSock) {
if (forwardTo && currentTime() >= forwardTo){
forwardTo = 0;
if (target_rate == 0.0){
realTime = 1000;//set playback speed to default
firstTime = Util::bootMS() - currentTime();
maxSkipAhead = 0;//enabled automatic rate control
}else{
stayLive = false;
//Set new realTime speed
realTime = 1000 / target_rate;
firstTime = Util::bootMS() - (currentTime() / target_rate);
maxSkipAhead = 1;//disable automatic rate control
}
JSON::Value r;
r["type"] = "set_speed";
r["data"]["play_rate_prev"] = "fast-forward";
if (target_rate == 0.0){
r["data"]["play_rate_curr"] = "auto";
}else{
r["data"]["play_rate_curr"] = target_rate;
}
webSock->sendFrame(r.toString());
}
// Handle nice move-over to new track ID
if (prevVidTrack != INVALID_TRACK_ID && thisIdx != prevVidTrack && M.getType(thisIdx) == "video"){
if (!thisPacket.getFlag("keyframe")){
// Ignore the packet if not a keyframe
return;
}
dropTrack(prevVidTrack, "Smoothly switching to new video track", false);
prevVidTrack = INVALID_TRACK_ID;
onIdle();
sendHeader();
/*
MP4::AVCC avccbox;
avccbox.setPayload(M.getInit(thisIdx));
std::string bs = avccbox.asAnnexB();
static Util::ResizeablePointer initBuf;
initBuf.assign(0,0);
initBuf.allocate(bs.size());
char * ib = initBuf;
initBuf.append(0, nalu::fromAnnexB(bs.data(), bs.size(), ib));
webBuf.truncate(0);
appendSinglePacketMoof(webBuf, bs.size());
char mdatHeader[8] ={0x00, 0x00, 0x00, 0x00, 'm', 'd', 'a', 't'};
Bit::htobl(mdatHeader, 8 + len); //8 bytes for the header + length of data.
webBuf.append(mdatHeader, 8);
webBuf.append(dataPointer, len);
webBuf.append(initBuf, initBuf.size());
webSock->sendFrame(webBuf, webBuf.size(), 2);
return;
*/
}
webBuf.truncate(0);
appendSinglePacketMoof(webBuf);
@ -1415,9 +1346,6 @@ namespace Mist{
}
webSock->sendFrame(headerData, headerData.size(), 2);
std::ofstream bleh("/tmp/bleh.mp4");
bleh.write(headerData, headerData.size());
bleh.close();
sentHeader = true;
return;
}
@ -1448,24 +1376,15 @@ namespace Mist{
}
void OutMP4::onWebsocketConnect() {
capa["name"] = "MP4/WS";
capa["maxdelay"] = 5000;
fragSeqNum = 0;
idleInterval = 1000;
maxSkipAhead = 0;
if (M.getLive()){dataWaitTimeout = 450;}
}
void OutMP4::onWebsocketFrame() {
JSON::Value command = JSON::fromString(webSock->data, webSock->data.size());
if (!command.isMember("type")) {
JSON::Value r;
r["type"] = "error";
r["data"] = "type field missing from command";
webSock->sendFrame(r.toString());
return;
}
if (!command.isMember("type")) {return;}
if (command["type"] == "request_codec_data") {
//If no supported codecs are passed, assume autodetected capabilities
@ -1490,119 +1409,10 @@ namespace Mist{
selectDefaultTracks();
initialSeek();
sendHeader();
}else if (command["type"] == "seek") {
handleWebsocketSeek(command);
}else if (command["type"] == "pause") {
parseData = !parseData;
JSON::Value r;
r["type"] = "pause";
r["paused"] = !parseData;
//Make sure we reset our timing code, too
if (parseData){
firstTime = Util::bootMS() - (currentTime() / target_rate);
}
webSock->sendFrame(r.toString());
}else if (command["type"] == "hold") {
parseData = false;
webSock->sendFrame("{\"type\":\"pause\",\"paused\":true}");
}else if (command["type"] == "tracks") {
if (command.isMember("audio")){
if (!command["audio"].isNull() && command["audio"] != "auto"){
targetParams["audio"] = command["audio"].asString();
}else{
targetParams.erase("audio");
}
}
if (command.isMember("video")){
if (!command["video"].isNull() && command["video"] != "auto"){
targetParams["video"] = command["video"].asString();
}else{
targetParams.erase("video");
}
}
if (command.isMember("seek_time")){
possiblyReselectTracks(command["seek_time"].asInt());
}else{
possiblyReselectTracks(currentTime());
}
return;
}else if (command["type"] == "set_speed") {
handleWebsocketSetSpeed(command);
}else if (command["type"] == "stop") {
Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "User requested stop");
myConn.close();
}else if (command["type"] == "play") {
parseData = true;
if (command.isMember("seek_time")){handleWebsocketSeek(command);}
}
}
bool OutMP4::possiblyReselectTracks(uint64_t seekTarget){
// Remember the previous video track, if any.
std::set<size_t> prevSelTracks;
prevVidTrack = INVALID_TRACK_ID;
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
prevSelTracks.insert(it->first);
if (M.getType(it->first) == "video"){
prevVidTrack = it->first;
}
}
if (!selectDefaultTracks()) {
prevVidTrack = INVALID_TRACK_ID;
onIdle();
return false;
}
if (seekTarget != currentTime()){prevVidTrack = INVALID_TRACK_ID;}
bool hasVideo = false;
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (M.getType(it->first) == "video"){hasVideo = true;}
}
// Add the previous video track back, if we had one.
if (prevVidTrack != INVALID_TRACK_ID && !userSelect.count(prevVidTrack) && hasVideo){
userSelect[prevVidTrack].reload(streamName, prevVidTrack);
seek(seekTarget);
std::set<size_t> newSelTracks;
newSelTracks.insert(prevVidTrack);
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (M.getType(it->first) != "video"){
newSelTracks.insert(it->first);
}
}
if (prevSelTracks != newSelTracks){
seek(seekTarget, true);
realTime = 0;
forwardTo = seekTarget;
sendHeader();
JSON::Value r;
r["type"] = "set_speed";
if (target_rate == 0.0){
r["data"]["play_rate_prev"] = "auto";
}else{
r["data"]["play_rate_prev"] = target_rate;
}
r["data"]["play_rate_curr"] = "fast-forward";
webSock->sendFrame(r.toString());
}
}else{
prevVidTrack = INVALID_TRACK_ID;
seek(seekTarget, true);
realTime = 0;
forwardTo = seekTarget;
sendHeader();
JSON::Value r;
r["type"] = "set_speed";
if (target_rate == 0.0){
r["data"]["play_rate_prev"] = "auto";
}else{
r["data"]["play_rate_prev"] = target_rate;
}
r["data"]["play_rate_curr"] = "fast-forward";
webSock->sendFrame(r.toString());
}
onIdle();
return true;
}
void OutMP4::sendWebsocketCodecData(const std::string& type) {
JSON::Value r;
r["type"] = type;
@ -1627,136 +1437,6 @@ namespace Mist{
webSock->sendFrame(r.toString());
}
bool OutMP4::handleWebsocketSeek(JSON::Value& command) {
JSON::Value r;
r["type"] = "seek";
if (!command.isMember("seek_time")){
r["error"] = "seek_time missing";
webSock->sendFrame(r.toString());
return false;
}
uint64_t seek_time = command["seek_time"].asInt();
if (!parseData){
parseData = true;
selectDefaultTracks();
}
stayLive = (target_rate == 0.0) && (Output::endTime() < seek_time + 5000);
if (command["seek_time"].asStringRef() == "live"){stayLive = true;}
if (stayLive){seek_time = Output::endTime();}
if (!seek(seek_time, true)) {
r["error"] = "seek failed, continuing as-is";
webSock->sendFrame(r.toString());
return false;
}
if (M.getLive()){r["data"]["live_point"] = stayLive;}
if (target_rate == 0.0){
r["data"]["play_rate_curr"] = "auto";
}else{
r["data"]["play_rate_curr"] = target_rate;
}
if (seek_time >= 250 && currentTime() < seek_time - 250){
forwardTo = seek_time;
realTime = 0;
r["data"]["play_rate_curr"] = "fast-forward";
}
onIdle();
webSock->sendFrame(r.toString());
return true;
}
bool OutMP4::handleWebsocketSetSpeed(JSON::Value& command) {
JSON::Value r;
r["type"] = "set_speed";
if (!command.isMember("play_rate")){
r["error"] = "play_rate missing";
webSock->sendFrame(r.toString());
return false;
}
double set_rate = command["play_rate"].asDouble();
if (!parseData){
parseData = true;
selectDefaultTracks();
}
if (target_rate == 0.0){
r["data"]["play_rate_prev"] = "auto";
}else{
r["data"]["play_rate_prev"] = target_rate;
}
if (set_rate == 0.0){
r["data"]["play_rate_curr"] = "auto";
}else{
r["data"]["play_rate_curr"] = set_rate;
}
if (target_rate != set_rate){
target_rate = set_rate;
if (target_rate == 0.0){
realTime = 1000;//set playback speed to default
firstTime = Util::bootMS() - currentTime();
maxSkipAhead = 0;//enabled automatic rate control
}else{
stayLive = false;
//Set new realTime speed
realTime = 1000 / target_rate;
firstTime = Util::bootMS() - (currentTime() / target_rate);
maxSkipAhead = 1;//disable automatic rate control
}
}
if (M.getLive()){r["data"]["live_point"] = stayLive;}
webSock->sendFrame(r.toString());
onIdle();
return true;
}
void OutMP4::onIdle() {
if (!webSock){return;}
if (!parseData){return;}
JSON::Value r;
r["type"] = "on_time";
r["data"]["current"] = currentTime();
r["data"]["begin"] = Output::startTime();
r["data"]["end"] = Output::endTime();
if (realTime == 0){
r["data"]["play_rate_curr"] = "fast-forward";
}else{
if (target_rate == 0.0){
r["data"]["play_rate_curr"] = "auto";
}else{
r["data"]["play_rate_curr"] = target_rate;
}
}
uint64_t jitter = 0;
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
r["data"]["tracks"].append((uint64_t)it->first);
if (jitter < M.getMinKeepAway(it->first)){jitter = M.getMinKeepAway(it->first);}
}
r["data"]["jitter"] = jitter;
if (M.getLive() && dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;}
if (capa["maxdelay"].asInt() < jitter*1.5){capa["maxdelay"] = jitter*1.5;}
webSock->sendFrame(r.toString());
}
bool OutMP4::onFinish() {
if (!webSock){
H.Chunkify(0, 0, myConn);
wantRequest = true;
return true;
}
JSON::Value r;
r["type"] = "on_stop";
r["data"]["current"] = currentTime();
r["data"]["begin"] = Output::startTime();
r["data"]["end"] = Output::endTime();
webSock->sendFrame(r.toString());
parseData = false;
return false;
}
void OutMP4::dropTrack(size_t trackId, const std::string &reason, bool probablyBad){
if (webSock && (reason == "EOP: data wait timeout" || reason == "disappeared from metadata") && possiblyReselectTracks(currentTime())){
return;

View file

@ -106,16 +106,10 @@ namespace Mist{
bool doesWebsockets() { return true; }
void onWebsocketConnect();
void onWebsocketFrame();
void onIdle();
virtual bool onFinish();
virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true);
protected:
bool possiblyReselectTracks(uint64_t seekTarget);
void sendWebsocketCodecData(const std::string& type);
bool handleWebsocketSeek(JSON::Value& command);
bool handleWebsocketSetSpeed(JSON::Value& command);
bool stayLive;
double target_rate; ///< Target playback speed rate (1.0 = normal, 0 = auto)
uint64_t fileSize;
uint64_t byteStart;
@ -123,11 +117,9 @@ namespace Mist{
int64_t leftOver;
uint64_t currPos;
uint64_t seekPoint;
uint64_t forwardTo;
uint64_t nextHeaderTime;
uint64_t headerSize;
size_t prevVidTrack;
// variables for standard MP4
std::set<keyPart> sortSet; // needed for unfragmented MP4, remembers the order of keyparts

View file

@ -1707,7 +1707,7 @@ namespace Mist{
size_t reTrack = next.cs_id * 3 + (F.data[0] == 0x09 ? 1 : (F.data[0] == 0x08 ? 2 : 3));
if (!reTrackToID.count(reTrack)){reTrackToID[reTrack] = INVALID_TRACK_ID;}
F.toMeta(meta, *amf_storage, reTrackToID[reTrack], targetParams);
if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){
if ((F.getDataLen() || (amf_storage && amf_storage->hasContent())) && !(F.needsInitData() && F.isInitData())){
uint64_t tagTime = next.timestamp;
uint64_t timeOffset = 0;
if (targetParams.count("timeoffset")){
@ -1778,8 +1778,22 @@ namespace Mist{
}
}
ltt = tagTime;
// bufferLivePacket(thisPacket);
bufferLivePacket(tagTime, F.offset(), idx, F.getData(), F.getDataLen(), 0, F.isKeyframe);
if (ltt){
for (std::map<size_t, uint64_t>::iterator it = lastTagTime.begin(); it != lastTagTime.end(); ++it){
if (it->second == reTrack){continue;}
size_t iIdx = reTrackToID[it->second];
if (it->first < ltt){
meta.setNowms(iIdx, ltt-1);
it->second = ltt-1;
}
}
}
if (F.data[0] == 0x12 && amf_storage){
std::string mData = amf_storage->toJSON().toString();
bufferLivePacket(tagTime, F.offset(), idx, mData.c_str(), mData.size(), 0, true);
}else{
bufferLivePacket(tagTime, F.offset(), idx, F.getData(), F.getDataLen(), 0, F.isKeyframe);
}
if (!meta){config->is_active = false;}
}
break;

View file

@ -30,6 +30,12 @@ namespace Mist{
}
void OutSRT::sendNext(){
// Reached the end we wanted? Stop here.
if (filter_to > 0 && thisTime > filter_to && filter_to > filter_from){
config->is_active = false;
return;
}
char *dataPointer = 0;
size_t len = 0;
thisPacket.getString("data", dataPointer, len);
@ -37,35 +43,22 @@ namespace Mist{
if (len == 0 || (len == 1 && dataPointer[0] == ' ')){return;}
std::stringstream tmp;
if (!webVTT){tmp << lastNum++ << std::endl;}
uint64_t time = thisPacket.getTime();
// filter subtitle in specific timespan
if (filter_from > 0 && time < filter_from){
index++; // when using seek, the index is lost.
seek(filter_from);
return;
}
if (filter_to > 0 && time > filter_to && filter_to > filter_from){
config->is_active = false;
return;
}
char tmpBuf[50];
size_t tmpLen =
sprintf(tmpBuf, "%.2" PRIu64 ":%.2" PRIu64 ":%.2" PRIu64 ".%.3" PRIu64, (time / 3600000),
((time % 3600000) / 60000), (((time % 3600000) % 60000) / 1000), time % 1000);
sprintf(tmpBuf, "%.2" PRIu64 ":%.2" PRIu64 ":%.2" PRIu64 ".%.3" PRIu64, (thisTime / 3600000),
((thisTime % 3600000) / 60000), (((thisTime % 3600000) % 60000) / 1000), thisTime % 1000);
tmp.write(tmpBuf, tmpLen);
tmp << " --> ";
time += thisPacket.getInt("duration");
if (time == thisPacket.getTime()){time += len * 75 + 800;}
uint64_t time = thisTime + thisPacket.getInt("duration");
if (time == thisTime){time += len * 75 + 800;}
tmpLen = sprintf(tmpBuf, "%.2" PRIu64 ":%.2" PRIu64 ":%.2" PRIu64 ".%.3" PRIu64, (time / 3600000),
((time % 3600000) / 60000), (((time % 3600000) % 60000) / 1000), time % 1000);
tmp.write(tmpBuf, tmpLen);
tmp << std::endl;
myConn.SendNow(tmp.str());
// prevent double newlines
if (dataPointer[len - 1] == '\n'){--dataPointer;}
// prevent extra newlines
while (len && dataPointer[len - 1] == '\n'){--len;}
myConn.SendNow(dataPointer, len);
myConn.SendNow("\n\n");
}
@ -82,7 +75,7 @@ namespace Mist{
void OutSRT::onHTTP(){
std::string method = H.method;
webVTT = (H.url.find(".vtt") != std::string::npos) || (H.url.find(".webvtt") != std::string::npos);
if (H.GetVar("track") != ""){
if (H.GetVar("track").size()){
size_t tid = atoll(H.GetVar("track").c_str());
if (M.getValidTracks().count(tid)){
userSelect.clear();
@ -94,7 +87,10 @@ namespace Mist{
filter_to = 0;
index = 0;
if (H.GetVar("from") != ""){filter_from = JSON::Value(H.GetVar("from")).asInt();}
if (H.GetVar("from") != ""){
filter_from = JSON::Value(H.GetVar("from")).asInt();
seek(filter_from);
}
if (H.GetVar("to") != ""){filter_to = JSON::Value(H.GetVar("to")).asInt();}
if (filter_to){realTime = 0;}

View file

@ -180,6 +180,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("+AC3");
capa["codecs"][0u][1u].append("+MP2");
capa["codecs"][0u][1u].append("+opus");
capa["codecs"][0u][2u].append("+JSON");
capa["codecs"][1u][0u].append("rawts");
cfg->addConnectorOptions(8888, capa);
config = cfg;
@ -194,6 +195,25 @@ namespace Mist{
opt["arg_num"] = 1;
opt["help"] = "Target tsudp:// or tsrtp:// or tstcp:// URL to push out towards.";
cfg->addOption("target", opt);
capa["optional"]["datatrack"]["name"] = "MPEG Data track parser";
capa["optional"]["datatrack"]["help"] = "Which parser to use for data tracks";
capa["optional"]["datatrack"]["type"] = "select";
capa["optional"]["datatrack"]["option"] = "--datatrack";
capa["optional"]["datatrack"]["short"] = "D";
capa["optional"]["datatrack"]["default"] = "";
capa["optional"]["datatrack"]["select"][0u][0u] = "";
capa["optional"]["datatrack"]["select"][0u][1u] = "None / disabled";
capa["optional"]["datatrack"]["select"][1u][0u] = "json";
capa["optional"]["datatrack"]["select"][1u][1u] = "2b size-prepended JSON";
opt.null();
opt["long"] = "datatrack";
opt["short"] = "D";
opt["arg"] = "string";
opt["default"] = "";
opt["help"] = "Which parser to use for data tracks";
config->addOption("datatrack", opt);
}
void OutTS::initialSeek(){
@ -290,6 +310,9 @@ namespace Mist{
onFinish();
return;
}
if (config->getString("datatrack") == "json"){
tsIn.setRawDataParser(TS::JSON);
}
}
// we now know we probably have a packet ready at the next 188 bytes
// remove from buffer and insert into TS input

View file

@ -226,8 +226,14 @@ namespace Mist{
fillPacket(dataPointer, dataLen, firstPack, video, keyframe, pkgPid, contPkg);
}else if (type == "meta"){
long unsigned int tempLen = dataLen;
if (codec == "JSON"){tempLen += 2;}
bs = TS::Packet::getPESMetaLeadIn(tempLen, packTime, M.getBps(thisIdx));
fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg);
if (codec == "JSON"){
char dLen[2];
Bit::htobs(dLen, dataLen);
fillPacket(dLen, 2, firstPack, video, keyframe, pkgPid, contPkg);
}
fillPacket(dataPointer, dataLen, firstPack, video, keyframe, pkgPid, contPkg);
}
if (packData.getBytesFree() < 184){

View file

@ -181,6 +181,9 @@ namespace Mist{
onFinish();
return;
}
if (config->getString("datatrack") == "json"){
tsIn.setRawDataParser(TS::JSON);
}
parseData = false;
wantRequest = true;
@ -225,6 +228,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("+AC3");
capa["codecs"][0u][1u].append("+MP2");
capa["codecs"][0u][1u].append("+opus");
capa["codecs"][0u][2u].append("+JSON");
capa["codecs"][1u][0u].append("rawts");
capa["optional"]["profile"]["name"] = "RIST profile";
@ -283,6 +287,25 @@ namespace Mist{
opt["arg_num"] = 1;
opt["help"] = "Target rist:// URL to push out towards.";
cfg->addOption("target", opt);
opt.null();
opt["long"] = "datatrack";
opt["short"] = "D";
opt["arg"] = "string";
opt["default"] = "";
opt["help"] = "Which parser to use for data tracks";
config->addOption("datatrack", opt);
capa["optional"]["datatrack"]["name"] = "MPEG Data track parser";
capa["optional"]["datatrack"]["help"] = "Which parser to use for data tracks";
capa["optional"]["datatrack"]["type"] = "select";
capa["optional"]["datatrack"]["option"] = "--datatrack";
capa["optional"]["datatrack"]["short"] = "D";
capa["optional"]["datatrack"]["default"] = "";
capa["optional"]["datatrack"]["select"][0u][0u] = "";
capa["optional"]["datatrack"]["select"][0u][1u] = "None / disabled";
capa["optional"]["datatrack"]["select"][1u][0u] = "json";
capa["optional"]["datatrack"]["select"][1u][1u] = "2b size-prepended JSON";
}
// Buffers TS packets and sends after 7 are buffered.

View file

@ -119,6 +119,9 @@ namespace Mist{
onFinish();
return;
}
if (config->getString("datatrack") == "json"){
tsIn.setRawDataParser(TS::JSON);
}
parseData = false;
wantRequest = true;
}
@ -209,6 +212,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
capa["codecs"][0u][1u].append("opus");
capa["codecs"][0u][2u].append("JSON");
capa["codecs"][1u][0u].append("rawts");
cfg->addConnectorOptions(8889, capa);
config = cfg;
@ -279,6 +283,25 @@ namespace Mist{
opt["arg_num"] = 1;
opt["help"] = "Target srt:// URL to push out towards.";
cfg->addOption("target", opt);
capa["optional"]["datatrack"]["name"] = "MPEG Data track parser";
capa["optional"]["datatrack"]["help"] = "Which parser to use for data tracks";
capa["optional"]["datatrack"]["type"] = "select";
capa["optional"]["datatrack"]["option"] = "--datatrack";
capa["optional"]["datatrack"]["short"] = "D";
capa["optional"]["datatrack"]["default"] = "";
capa["optional"]["datatrack"]["select"][0u][0u] = "";
capa["optional"]["datatrack"]["select"][0u][1u] = "None / disabled";
capa["optional"]["datatrack"]["select"][1u][0u] = "json";
capa["optional"]["datatrack"]["select"][1u][1u] = "2b size-prepended JSON";
opt.null();
opt["long"] = "datatrack";
opt["short"] = "D";
opt["arg"] = "string";
opt["default"] = "";
opt["help"] = "Which parser to use for data tracks";
config->addOption("datatrack", opt);
}
// Buffers TS packets and sends after 7 are buffered.