From ad4c1abd0b02050fdff0f24eb0afdf2191fccc15 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 19 Jul 2021 16:03:05 +0200 Subject: [PATCH] Playlist support fixed --- lib/dtsc.cpp | 33 +++- lib/dtsc.h | 4 + src/input/input.cpp | 305 +++++++++++++++++++---------------- src/input/input.h | 1 - src/input/input_av.cpp | 12 +- src/input/input_buffer.cpp | 10 +- src/input/input_dtsc.cpp | 6 +- src/input/input_ebml.cpp | 34 ++-- src/input/input_ebml.h | 2 +- src/input/input_flv.cpp | 8 +- src/input/input_h264.cpp | 2 + src/input/input_hls.cpp | 9 +- src/input/input_ismv.cpp | 2 + src/input/input_mp3.cpp | 7 +- src/input/input_mp4.cpp | 5 +- src/input/input_playlist.cpp | 14 +- src/input/input_ts.cpp | 5 +- src/io.cpp | 104 ++++++------ src/io.h | 14 +- src/output/output.h | 3 - 20 files changed, 343 insertions(+), 237 deletions(-) diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index a5673844..acfd9c0f 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -1452,12 +1452,12 @@ namespace DTSC{ /// Resizes a given track to be able to hold the given amount of fragments, keys, parts and pages. /// Currently called exclusively from Meta::update(), to resize the internal structures. void Meta::resizeTrack(size_t source, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount, const char * reason){ - char pageName[NAME_BUFFER_SIZE]; IPC::semaphore resizeLock; if (!isMemBuf){ - snprintf(pageName, NAME_BUFFER_SIZE, "/" SHM_STREAM_TM, streamName.c_str(), getpid(), source); - resizeLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + std::string pageName = "/"; + pageName += trackList.getPointer(trackPageField, source); + resizeLock.open(pageName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); resizeLock.wait(); } @@ -1492,9 +1492,8 @@ namespace DTSC{ memset(tMemBuf[source], 0, newPageSize); t.track = Util::RelAccX(tMemBuf[source], false); }else{ - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_TM, streamName.c_str(), getpid(), source); tM[source].master = true; - tM[source].init(pageName, newPageSize, true); + tM[source].init(trackList.getPointer(trackPageField, source), newPageSize, true); if (!tM[source].mapped){ FAIL_MSG("Failed to re-allocate shared memory for track %zu: %s", source, strerror(errno)); resizeLock.unlink(); @@ -1711,6 +1710,26 @@ namespace DTSC{ return tNumber; } + bool Meta::isClaimed(size_t trackIdx) const{ + return (trackList.getInt(trackPidField, trackIdx) != 0); + } + + void Meta::claimTrack(size_t trackIdx){ + if (trackList.getInt(trackPidField, trackIdx) != 0){ + FAIL_MSG("Cannot claim track: already claimed by PID %" PRIu64, trackList.getInt(trackPidField, trackIdx)); + return; + } + trackList.setInt(trackPidField, getpid(), trackIdx); + } + + void Meta::abandonTrack(size_t trackIdx){ + if (trackList.getInt(trackPidField, trackIdx) != getpid()){ + FAIL_MSG("Cannot abandon track: is claimed by PID %" PRIu64 ", not us", trackList.getInt(trackPidField, trackIdx)); + return; + } + trackList.setInt(trackPidField, 0, trackIdx); + } + /// Internal function that is called whenever a track is (re)written to the memory structures. /// Adds the needed fields and sets all the RelAccXFieldData members to point to them. void Meta::initializeTrack(Track &t, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount){ @@ -2178,12 +2197,10 @@ namespace DTSC{ /// Removes the first key from the memory structure and caches. bool Meta::removeFirstKey(size_t trackIdx){ - char pageName[NAME_BUFFER_SIZE]; IPC::semaphore resizeLock; if (!isMemBuf){ - __pid_t trPid = trackList.getInt(trackPidField, trackIdx); - snprintf(pageName, NAME_BUFFER_SIZE, "/" SHM_STREAM_TM, streamName.c_str(), trPid, trackIdx); + const char * pageName = trackList.getPointer(trackPageField, trackIdx); resizeLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!resizeLock.tryWait()){ MEDIUM_MSG("Metadata is busy, delaying deletion of key a bit"); diff --git a/lib/dtsc.h b/lib/dtsc.h index 19e6756c..59d4f79f 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -382,6 +382,10 @@ namespace DTSC{ void setMaxKeepAway(uint64_t maxKeepAway); uint64_t getMaxKeepAway() const; + void claimTrack(size_t trackIdx); + bool isClaimed(size_t trackIdx) const; + void abandonTrack(size_t trackIdx); + /*LTS-START*/ void setSourceTrack(size_t trackIdx, size_t sourceTrack); uint64_t getSourceTrack(size_t trackIdx) const; diff --git a/src/input/input.cpp b/src/input/input.cpp index 5b52c38c..2d3e6a0a 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -170,21 +170,7 @@ namespace Mist{ capa["optional"]["realtime"]["name"] = "Simulated Live"; capa["optional"]["realtime"]["help"] = "Make this input run as a simulated live stream"; capa["optional"]["realtime"]["option"] = "--realtime"; - option.null(); - option["long"] = "simulated-starttime"; - option["arg"] = "integer"; - option["short"] = "S"; - option["help"] = "Unix timestamp on which the simulated start of the stream is based."; - option["value"].append(0); - config->addOption("simulated-starttime", option); - capa["optional"]["simulated-starttime"]["name"] = "Simulated start time"; - capa["optional"]["simulated-starttime"]["help"] = - "The unix timestamp on which this stream is assumed to have started playback, or 0 for " - "automatic"; - capa["optional"]["simulated-starttime"]["option"] = "--simulated-starttime"; - capa["optional"]["simulated-starttime"]["type"] = "uint"; - capa["optional"]["simulated-starttime"]["default"] = 0; /*LTS-END*/ capa["optional"]["debug"]["name"] = "debug"; @@ -761,6 +747,15 @@ namespace Mist{ } INFO_MSG("Input started"); + + //Simulated real time inputs bypass most normal logic + if (config->getBool("realtime")){ + realtimeMainLoop(); + finish(); + INFO_MSG("Real-time input closing clean; reason: %s", Util::exitReason); + return; + } + meta.reInit(streamName, false); if (!openStreamSource()){ @@ -769,10 +764,8 @@ namespace Mist{ } parseStreamHeader(); - std::set validTracks; - if (publishesTracks()){ - validTracks = M.getMySourceTracks(getpid()); + std::set validTracks = M.getMySourceTracks(getpid()); if (!validTracks.size()){ userSelect.clear(); finish(); @@ -781,72 +774,9 @@ namespace Mist{ } } - timeOffset = 0; - uint64_t minFirstMs = 0; - - // If resume mode is on, find matching tracks and set timeOffset values to make sure we append to the tracks. - if (publishesTracks() && config->getBool("realtime")){ - seek(0); - - minFirstMs = 0xFFFFFFFFFFFFFFFFull; - uint64_t maxFirstMs = 0; - uint64_t minLastMs = 0xFFFFFFFFFFFFFFFFull; - uint64_t maxLastMs = 0; - - // track lowest firstms value - for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ - if (meta.getFirstms(*it) < minFirstMs){minFirstMs = meta.getFirstms(*it);} - if (meta.getFirstms(*it) > maxFirstMs){maxFirstMs = meta.getFirstms(*it);} - if (meta.getLastms(*it) < minLastMs){minLastMs = meta.getLastms(*it);} - if (meta.getLastms(*it) > maxLastMs){maxLastMs = meta.getLastms(*it);} - } - if (maxFirstMs - minFirstMs > 500){ - WARN_MSG("Begin timings of tracks for this file are %" PRIu64 - " ms apart. This may mess up playback to some degree. (Range: %" PRIu64 - "ms - %" PRIu64 "ms)", - maxFirstMs - minFirstMs, minFirstMs, maxFirstMs); - } - if (maxLastMs - minLastMs > 500){ - WARN_MSG("Stop timings of tracks for this file are %" PRIu64 - " ms apart. This may mess up playback to some degree. (Range: %" PRIu64 - "ms - %" PRIu64 "ms)", - maxLastMs - minLastMs, minLastMs, maxLastMs); - } - // find highest current time - for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ - timeOffset = std::max(timeOffset, (int64_t)meta.getLastms(*it)); - } - - if (timeOffset){ - if (minFirstMs == 0xFFFFFFFFFFFFFFFFull){minFirstMs = 0;} - MEDIUM_MSG("Offset is %" PRId64 - "ms, adding 40ms and subtracting the start time of %" PRIu64, - timeOffset, minFirstMs); - timeOffset += 40; // Add an artificial frame at 25 FPS to make sure we append, not overwrite - timeOffset -= minFirstMs; // we don't need to add the lowest firstms value to the offset, as it's already there - } - } - if (publishesTracks()){ - for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ - meta.setFirstms(*it, meta.getFirstms(*it)+timeOffset); - meta.setLastms(*it, 0); - } - } - - simStartTime = config->getInteger("simulated-starttime"); - if (!simStartTime){simStartTime = Util::bootMS();} - - std::string reason; - if (config->getBool("realtime")){ - realtimeMainLoop(); - }else{ - streamMainLoop(); - } - + streamMainLoop(); closeStreamSource(); - userSelect.clear(); - finish(); INFO_MSG("Input closing clean; reason: %s", Util::exitReason); return; @@ -855,30 +785,24 @@ namespace Mist{ void Input::streamMainLoop(){ uint64_t statTimer = 0; uint64_t startTime = Util::bootSecs(); - size_t tid; - size_t idx; Comms::Statistics statComm; getNext(); - tid = thisPacket.getTrackId(); - idx = M.trackIDToIndex(tid, getpid()); - if (thisPacket && !userSelect.count(idx)){ - userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + if (thisPacket && !userSelect.count(thisIdx)){ + userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } - while (thisPacket && config->is_active && userSelect[idx]){ - if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){ + while (thisPacket && config->is_active && userSelect[thisIdx]){ + if (userSelect[thisIdx].getStatus() & COMM_STATUS_REQDISCONNECT){ Util::logExitReason("buffer requested shutdown"); break; } bufferLivePacket(thisPacket); getNext(); if (!thisPacket){ - Util::logExitReason("invalid packet from getNext"); + Util::logExitReason("no more data"); break; } - tid = thisPacket.getTrackId(); - idx = M.trackIDToIndex(tid, getpid()); - if (thisPacket && !userSelect.count(idx)){ - userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + if (thisPacket && !userSelect.count(thisIdx)){ + userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } if (Util::bootSecs() - statTimer > 1){ @@ -912,39 +836,147 @@ namespace Mist{ } void Input::realtimeMainLoop(){ + MEDIUM_MSG("Starting real-time main loop!"); uint64_t statTimer = 0; uint64_t startTime = Util::bootSecs(); + size_t idx; Comms::Statistics statComm; - getNext(); - if (thisPacket && !userSelect.count(thisPacket.getTrackId())){ - size_t tid = thisPacket.getTrackId(); - userSelect[tid].reload(streamName, tid, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + + + DTSC::Meta liveMeta(config->getString("streamname"), false); + DTSC::veryUglyJitterOverride = SIMULATED_LIVE_BUFFER; + + + uint64_t minFirstMs = 0xFFFFFFFFFFFFFFFFull; + uint64_t maxFirstMs = 0; + uint64_t minLastMs = 0xFFFFFFFFFFFFFFFFull; + uint64_t maxLastMs = 0; + + // track lowest firstms value + std::set validTracks = M.getValidTracks(); + INFO_MSG("VoD metadata has %zu valid tracks", validTracks.size()); + if (!validTracks.size()){ + FAIL_MSG("No valid tracks! Aborting!"); + return; } - while (thisPacket && config->is_active && userSelect[thisPacket.getTrackId()]){ - thisPacket.nullMember("bpos"); - while (config->is_active && userSelect[thisPacket.getTrackId()] && - Util::bootMS() + SIMULATED_LIVE_BUFFER < (thisPacket.getTime() + timeOffset) + simStartTime){ - Util::sleep(std::min(((thisPacket.getTime() + timeOffset) + simStartTime) - (Util::getMS() + SIMULATED_LIVE_BUFFER), + for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ + if (M.getFirstms(*it) < minFirstMs){minFirstMs = M.getFirstms(*it);} + if (M.getFirstms(*it) > maxFirstMs){maxFirstMs = M.getFirstms(*it);} + if (M.getLastms(*it) < minLastMs){minLastMs = M.getLastms(*it);} + if (M.getLastms(*it) > maxLastMs){maxLastMs = M.getLastms(*it);} + } + if (maxFirstMs - minFirstMs > 500){ + WARN_MSG("Begin timings of tracks for this file are %" PRIu64 + " ms apart. This may mess up playback to some degree. (Range: %" PRIu64 + "ms - %" PRIu64 "ms)", + maxFirstMs - minFirstMs, minFirstMs, maxFirstMs); + } + if (maxLastMs - minLastMs > 500){ + WARN_MSG("Stop timings of tracks for this file are %" PRIu64 + " ms apart. This may mess up playback to some degree. (Range: %" PRIu64 + "ms - %" PRIu64 "ms)", + maxLastMs - minLastMs, minLastMs, maxLastMs); + } + if (minFirstMs == 0xFFFFFFFFFFFFFFFFull){minFirstMs = 0;} + + // find highest current time + int64_t timeOffset = 0; + validTracks = liveMeta.getValidTracks(); + for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ + timeOffset = std::max(timeOffset, (int64_t)liveMeta.getLastms(*it)); + } + INFO_MSG("Live metadata has %zu valid tracks, last timestamp %" PRIu64, validTracks.size(), timeOffset); + if (timeOffset){ + MEDIUM_MSG("Offset is %" PRId64 + "ms, adding 40ms and subtracting the start time of %" PRIu64, + timeOffset, minFirstMs); + timeOffset += 40; // Add an artificial frame at 25 FPS to make sure we append, not overwrite + } + timeOffset -= minFirstMs; // we don't need to add the lowest firstms value to the offset, as it's already there + + /// This maps local track offsets to stream track offsets + std::map realTimeTrackMap; + + //No time offset and/or no currently valid tracks? + //That means this must be the first entry in this realtime stream. Create the tracks! + if (!timeOffset || !validTracks.size()){ + liveMeta.setBootMsOffset(Util::bootMS()); + validTracks = M.getValidTracks(); + size_t newID = 0; + for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ + size_t newIdx = liveMeta.addTrack(); + realTimeTrackMap[*it] = newIdx; + MEDIUM_MSG("Gonna write track %zu to %zu", *it, newIdx); + liveMeta.setID(newIdx, newID++); + liveMeta.setType(newIdx, M.getType(*it)); + liveMeta.setCodec(newIdx, M.getCodec(*it)); + liveMeta.setFpks(newIdx, M.getFpks(*it)); + liveMeta.setInit(newIdx, M.getInit(*it)); + liveMeta.setLang(newIdx, M.getLang(*it)); + liveMeta.setRate(newIdx, M.getRate(*it)); + liveMeta.setSize(newIdx, M.getSize(*it)); + liveMeta.setWidth(newIdx, M.getWidth(*it)); + liveMeta.setHeight(newIdx, M.getHeight(*it)); + } + }else{ + validTracks = M.getValidTracks(); + std::set validLive = liveMeta.getValidTracks(); + for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ + for (std::set::iterator lit = validLive.begin(); lit != validLive.end(); ++lit){ + if (liveMeta.isClaimed(*lit)){continue;} + if (liveMeta.getType(*lit) != M.getType(*it)){continue;} + if (liveMeta.getCodec(*lit) != M.getCodec(*it)){continue;} + if (liveMeta.getInit(*lit) != M.getInit(*it)){continue;} + //Matching type/codec/init! Use it! + realTimeTrackMap[*it] = *lit; + liveMeta.claimTrack(*lit); + MEDIUM_MSG("Gonna write track %zu to existing track %zu", *it, *lit); + break; + } + } + } + int64_t bootMsOffset = liveMeta.getBootMsOffset(); + validTracks.clear(); + + seek(0);/// \TODO Is this actually needed? + while (config->is_active){ + getNext(); + if (!thisPacket){ + Util::logExitReason("no more data"); + break; + } + idx = realTimeTrackMap.count(thisIdx) ? realTimeTrackMap[thisIdx] : INVALID_TRACK_ID; + if (thisPacket && !userSelect.count(idx)){ + userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + } + if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){ + Util::logExitReason("buffer requested shutdown"); + break; + } + while (config->is_active && userSelect[idx] && + Util::bootMS() + SIMULATED_LIVE_BUFFER < (thisTime + timeOffset) + bootMsOffset){ + Util::sleep(std::min(((thisTime + timeOffset) + bootMsOffset) - (Util::getMS() + SIMULATED_LIVE_BUFFER), (uint64_t)1000)); } - uint64_t originalTime = thisPacket.getTime(); - thisPacket.setTime(originalTime + timeOffset); - bufferLivePacket(thisPacket); - thisPacket.setTime(originalTime); - getNext(); - if (thisPacket && !userSelect.count(thisPacket.getTrackId())){ - size_t tid = thisPacket.getTrackId(); - userSelect[tid].reload(streamName, tid, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + //Buffer the packet + if (idx == INVALID_TRACK_ID){ + INFO_MSG("Packet for track %zu has no valid index!", thisIdx); + }else{ + char *data; + size_t dataLen; + thisPacket.getString("data", data, dataLen); + bufferLivePacket(thisTime+timeOffset, thisPacket.getInt("offset"), idx, data, dataLen, 0, thisPacket.getFlag("keyframe"), liveMeta); } + if (Util::bootSecs() - statTimer > 1){ // Connect to stats for INPUT detection if (!statComm){statComm.reload();} if (statComm){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ - Util::logExitReason("received shutdown request from controller"); config->is_active = false; + Util::logExitReason("received shutdown request from controller"); return; } uint64_t now = Util::bootSecs(); @@ -952,21 +984,16 @@ namespace Mist{ statComm.setCRC(getpid()); statComm.setStream(streamName); statComm.setConnector("INPUT:" + capa["name"].asStringRef()); - statComm.setUp(0); - statComm.setDown(streamByteCount()); statComm.setTime(now - startTime); statComm.setLastSecond(0); - statComm.setHost(getConnectedBinHost()); + connStats(statComm); } statTimer = Util::bootSecs(); } } - if (!thisPacket){ - Util::logExitReason("invalid packet from getNext"); - } - if (thisPacket && !userSelect[thisPacket.getTrackId()]){ - Util::logExitReason("buffer shutdown"); + for (std::map::iterator it = realTimeTrackMap.begin(); it != realTimeTrackMap.end(); ++it){ + liveMeta.abandonTrack(it->second); } } @@ -1249,7 +1276,7 @@ namespace Mist{ pageIdx = i; } uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); - if (isBuffered(idx, pageNumber)){ + if (isBuffered(idx, pageNumber, meta)){ // Mark the page for removal after 15 seconds of no one watching it pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT; DONTEVEN_MSG("Track %zu, key %" PRIu32 " is already buffered in page %" PRIu32 @@ -1261,7 +1288,7 @@ namespace Mist{ uint64_t bufferTimer = Util::bootMS(); keyNum = pageNumber; IPC::sharedPage page; - if (!bufferStart(idx, pageNumber, page)){ + if (!bufferStart(idx, pageNumber, page, meta)){ WARN_MSG("bufferStart failed! Cancelling bufferFrame"); return false; } @@ -1288,7 +1315,7 @@ namespace Mist{ }else{ getNext(sourceIdx); // in case earlier seeking was imprecise, seek to the exact point - while (thisPacket && thisPacket.getTime() < keyTime){getNext(sourceIdx);} + while (thisPacket && thisTime < keyTime){getNext(sourceIdx);} } uint64_t lastBuffered = 0; uint32_t packCounter = 0; @@ -1313,8 +1340,8 @@ namespace Mist{ size_t partNo = 0; for (size_t i = 0; i < keyNum; ++i){partNo += keys.getParts(i);} DTSC::Parts parts(M.parts(idx)); - while (thisPacket && thisPacket.getTime() < stopTime){ - if (thisPacket.getTime() >= lastBuffered){ + while (thisPacket && thisTime < stopTime){ + if (thisTime >= lastBuffered){ if (sourceIdx != idx){ if (encryption.find(":") != std::string::npos || M.getEncryption(idx).find(":") != std::string::npos){ if (encryption == ""){ @@ -1325,11 +1352,11 @@ namespace Mist{ } if (encryption.substr(0, encryption.find('/')) == "CTR128"){ DTSC::Packet encPacket = aesCipher.encryptPacketCTR( - M, thisPacket, M.getIvec(idx) + M.getPartIndex(thisPacket.getTime(), idx), idx); + M, thisPacket, M.getIvec(idx) + M.getPartIndex(thisTime, idx), idx); thisPacket = encPacket; }else if (encryption.substr(0, encryption.find('/')) == "CBC128"){ char ivec[] ={0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; - Bit::htobll(ivec + 8, M.getIvec(idx) + M.getPartIndex(thisPacket.getTime(), idx)); + Bit::htobll(ivec + 8, M.getIvec(idx) + M.getPartIndex(thisTime, idx)); DTSC::Packet encPacket = aesCipher.encryptPacketCBC(M, thisPacket, ivec, idx); thisPacket = encPacket; } @@ -1358,12 +1385,12 @@ namespace Mist{ INFO_MSG("Part size mismatch: %zu != %zu", dataLen, parts.getSize(partNo)); } ++partNo; - HIGH_MSG("Buffering VoD packet (%zuB) @%" PRIu64 " ms on track %zu with offset %" PRIu64, dataLen, thisPacket.getTime(), idx, thisPacket.getInt("offset")); - bufferNext(thisPacket.getTime(), thisPacket.getInt("offset"), idx, data, dataLen, + HIGH_MSG("Buffering VoD packet (%zuB) @%" PRIu64 " ms on track %zu with offset %" PRIu64, dataLen, thisTime, idx, thisPacket.getInt("offset")); + bufferNext(thisTime, thisPacket.getInt("offset"), idx, data, dataLen, thisPacket.getInt("bpos"), thisPacket.getFlag("keyframe"), page); ++packCounter; byteCounter += thisPacket.getDataLen(); - lastBuffered = thisPacket.getTime(); + lastBuffered = thisTime; } getNext(sourceIdx); } @@ -1383,7 +1410,7 @@ namespace Mist{ bufferFinalize(idx, page); bufferTimer = Util::bootMS() - bufferTimer; INFO_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) buffered in %" PRIu64 "ms", - idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisPacket.getTime(), bufferTimer); + idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisTime, bufferTimer); INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter, tPages.getInt("parts", pageIdx), byteCounter); pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT; @@ -1392,17 +1419,25 @@ namespace Mist{ bool Input::atKeyFrame(){ static std::map lastSeen; - size_t idx = thisPacket.getTrackId(); // not in keyTimes? We're not at a keyframe. - if (!keyTimes[idx].count(thisPacket.getTime())){return false;} + if (!keyTimes[thisIdx].count(thisTime)){return false;} // skip double times - if (lastSeen.count(idx) && lastSeen[idx] == thisPacket.getTime()){return false;} + if (lastSeen.count(thisIdx) && lastSeen[thisIdx] == thisTime){return false;} // set last seen, and return true - lastSeen[idx] = thisPacket.getTime(); + lastSeen[thisIdx] = thisTime; return true; } bool Input::readExistingHeader(){ + if (config->getBool("realtime")){ + meta.reInit("", config->getString("input") + ".dtsh"); + if (!meta){return false;} + if (meta.version != DTSH_VERSION){ + INFO_MSG("Updating wrong version header file from version %u to %u", meta.version, DTSH_VERSION); + return false; + } + return meta; + } char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, config->getString("streamname").c_str()); IPC::sharedPage sp(pageName, 0, false, false); diff --git a/src/input/input.h b/src/input/input.h index 0e33c254..23729878 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -77,7 +77,6 @@ namespace Mist{ JSON::Value capa; - int64_t timeOffset; std::map > keyTimes; // Create server for user pages diff --git a/src/input/input_av.cpp b/src/input/input_av.cpp index fa3118ff..94774cb9 100644 --- a/src/input/input_av.cpp +++ b/src/input/input_av.cpp @@ -144,14 +144,14 @@ namespace Mist{ long long packOffset = 0; bool isKey = false; if (packTime < 0){packTime = 0;} - size_t idx = meta.trackIDToIndex(packet.stream_index + 1); + size_t idx = meta.trackIDToIndex(packet.stream_index); if (packet.flags & AV_PKT_FLAG_KEY && M.getType(idx) != "audio"){ isKey = true; } if (packet.pts != AV_NOPTS_VALUE && packet.pts != packet.dts){ packOffset = ((packet.pts - packet.dts) * 1000 * strm->time_base.num / strm->time_base.den); } - meta.update(packTime, packOffset, packet.stream_index + 1, packet.size, packet.pos, isKey); + meta.update(packTime, packOffset, idx, packet.size, packet.pos, isKey); av_packet_unref(&packet); } return true; @@ -161,11 +161,11 @@ namespace Mist{ AVPacket packet; while (av_read_frame(pFormatCtx, &packet) >= 0){ // filter tracks we don't care about - size_t idx = meta.trackIDToIndex(packet.stream_index + 1); + size_t idx = meta.trackIDToIndex(packet.stream_index); if (idx == INVALID_TRACK_ID){continue;} if (wantIdx != INVALID_TRACK_ID && idx != wantIdx){continue;} if (!userSelect.count(idx)){ - HIGH_MSG("Track %u not selected", packet.stream_index + 1); + HIGH_MSG("Track %u not selected", packet.stream_index); continue; } AVStream *strm = pFormatCtx->streams[packet.stream_index]; @@ -179,7 +179,9 @@ namespace Mist{ if (packet.pts != AV_NOPTS_VALUE && packet.pts != packet.dts){ packOffset = ((packet.pts - packet.dts) * 1000 * strm->time_base.num / strm->time_base.den); } - thisPacket.genericFill(packTime, packOffset, packet.stream_index + 1, + thisTime = packTime; + thisIdx = idx; + thisPacket.genericFill(packTime, packOffset, thisIdx, (const char *)packet.data, packet.size, 0, isKey); av_packet_unref(&packet); return; // success! diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 1a22f619..534f9f6e 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -476,9 +476,13 @@ namespace Mist{ } void inputBuffer::userOnDisconnect(size_t id){ if (sourcePids.count(id)){ - INFO_MSG("Disconnected track %zu", sourcePids[id]); - meta.reloadReplacedPagesIfNeeded(); - removeTrack(sourcePids[id]); + if (!resumeMode){ + INFO_MSG("Disconnected track %zu", sourcePids[id]); + meta.reloadReplacedPagesIfNeeded(); + removeTrack(sourcePids[id]); + }else{ + INFO_MSG("Track %zu lost its source, keeping it around for resume", sourcePids[id]); + } sourcePids.erase(id); } } diff --git a/src/input/input_dtsc.cpp b/src/input/input_dtsc.cpp index 7fe4759f..390c07fa 100644 --- a/src/input/input_dtsc.cpp +++ b/src/input/input_dtsc.cpp @@ -262,7 +262,7 @@ namespace Mist{ moreHeader = S.getMember("moreheader").asInt(); }else{ moreHeader = 0; - meta.reInit(streamName, moreHeader); + meta.reInit(isSingular() ? streamName : "", S); } free(pkt); @@ -329,6 +329,8 @@ namespace Mist{ return; } thisPacket.reInit(pBuf.data(), pBuf.size()); + thisTime = thisPacket.getTime(); + thisIdx = thisPacket.getTrackId(); seekNext(thisPos.seekTime, thisPos.trackID); fseek(F, thisPos.bytePos, SEEK_SET); } @@ -361,6 +363,8 @@ namespace Mist{ thisPacket.reInit(srcConn); // read the next packet before continuing continue; // parse the next packet before returning } + thisTime = thisPacket.getTime(); + thisIdx = thisPacket.getTrackId(); return; // we have a packet } } diff --git a/src/input/input_ebml.cpp b/src/input/input_ebml.cpp index 041d6c94..d1ac1074 100644 --- a/src/input/input_ebml.cpp +++ b/src/input/input_ebml.cpp @@ -182,7 +182,7 @@ namespace Mist{ // Create header file from file uint64_t bench = Util::getMicros(); if (!meta || (needsLock() && isSingular())){ - meta.reInit(streamName); + meta.reInit(isSingular() ? streamName : ""); } while (readElement()){ @@ -404,7 +404,7 @@ namespace Mist{ frameSize = assStr.size(); } if (frameSize){ - TP.add(newTime * timeScale, tNum, frameSize, lastClusterBPos, B.isKeyframe() && !isAudio, isVideo); + TP.add(newTime * timeScale, idx, frameSize, lastClusterBPos, B.isKeyframe() && !isAudio, isVideo); } } while (TP.hasPackets()){ @@ -481,6 +481,8 @@ namespace Mist{ } thisPacket.genericFill(C.time, C.offset, C.track, C.ptr, C.dsize, C.bpos, C.key); + thisTime = C.time; + thisIdx = C.track; } void InputEBML::getNext(size_t idx){ @@ -533,10 +535,10 @@ namespace Mist{ uint64_t tNum = B.getTrackNum(); uint64_t newTime = lastClusterTime + B.getTimecode(); trackPredictor &TP = packBuf[tNum]; - size_t trackIdx = M.trackIDToIndex(tNum, getpid()); - bool isVideo = (M.getType(trackIdx) == "video"); - bool isAudio = (M.getType(trackIdx) == "audio"); - bool isASS = (M.getCodec(trackIdx) == "subtitle" && M.getInit(trackIdx).size()); + thisIdx = M.trackIDToIndex(tNum, getpid()); + bool isVideo = (M.getType(thisIdx) == "video"); + bool isAudio = (M.getType(thisIdx) == "audio"); + bool isASS = (M.getCodec(thisIdx) == "subtitle" && M.getInit(thisIdx).size()); // If this is a new video keyframe, flush the corresponding trackPredictor if (isVideo && B.isKeyframe() && bufferedPacks){ @@ -546,7 +548,7 @@ namespace Mist{ fillPacket(C); TP.remove(); --bufferedPacks; - if (singleTrack && trackIdx != idx){getNext(idx);} + if (singleTrack && thisIdx != idx){getNext(idx);} return; } } @@ -555,19 +557,19 @@ namespace Mist{ for (uint64_t frameNo = 0; frameNo < B.getFrameCount(); ++frameNo){ if (frameNo){ - if (M.getCodec(trackIdx) == "AAC"){ - newTime += (1000000 / M.getRate(trackIdx)) / timeScale; // assume ~1000 samples per frame - }else if (M.getCodec(trackIdx) == "MP3"){ - newTime += (1152000 / M.getRate(trackIdx)) / timeScale; // 1152 samples per frame - }else if (M.getCodec(trackIdx) == "DTS"){ + if (M.getCodec(thisIdx) == "AAC"){ + newTime += (1000000 / M.getRate(thisIdx)) / timeScale; // assume ~1000 samples per frame + }else if (M.getCodec(thisIdx) == "MP3"){ + newTime += (1152000 / M.getRate(thisIdx)) / timeScale; // 1152 samples per frame + }else if (M.getCodec(thisIdx) == "DTS"){ // Assume 512 samples per frame (DVD default) // actual amount can be calculated from data, but data // is not available during header generation... // See: http://www.stnsoft.com/DVD/dtshdr.html - newTime += (512000 / M.getRate(trackIdx)) / timeScale; + newTime += (512000 / M.getRate(thisIdx)) / timeScale; }else{ ERROR_MSG("Unknown frame duration for codec %s - timestamps WILL be wrong!", - M.getCodec(trackIdx).c_str()); + M.getCodec(thisIdx).c_str()); } } uint32_t frameSize = B.getFrameSize(frameNo); @@ -579,7 +581,7 @@ namespace Mist{ memcpy(ptr, assStr.data(), frameSize); } if (frameSize){ - TP.add(newTime * timeScale, tNum, frameSize, lastClusterBPos, + TP.add(newTime * timeScale, thisIdx, frameSize, lastClusterBPos, B.isKeyframe() && !isAudio, isVideo, (void *)ptr); ++bufferedPacks; } @@ -590,7 +592,7 @@ namespace Mist{ fillPacket(C); TP.remove(); --bufferedPacks; - if (singleTrack && trackIdx != idx){getNext(idx);} + if (singleTrack && thisIdx != idx){getNext(idx);} }else{ // We didn't set thisPacket yet. Read another. // Recursing is fine, this can only happen a few times in a row. diff --git a/src/input/input_ebml.h b/src/input/input_ebml.h index 68c06914..b4c29d30 100644 --- a/src/input/input_ebml.h +++ b/src/input/input_ebml.h @@ -150,7 +150,7 @@ namespace Mist{ bool readExistingHeader(); void parseStreamHeader(){readHeader();} bool openStreamSource(){return true;} - bool needHeader(){return needsLock() && !readExistingHeader();} + bool needHeader(){return (config->getBool("realtime") || needsLock()) && !readExistingHeader();} double timeScale; bool wantBlocks; size_t totalBytes; diff --git a/src/input/input_flv.cpp b/src/input/input_flv.cpp index 459c6943..d7399c23 100644 --- a/src/input/input_flv.cpp +++ b/src/input/input_flv.cpp @@ -79,7 +79,8 @@ namespace Mist{ bool inputFLV::readHeader(){ if (!inFile){return false;} - meta.reInit(config->getString("streamname")); + if (readExistingHeader()){return true;} + meta.reInit(isSingular() ? streamName : ""); // Create header file from FLV data Util::fseek(inFile, 13, SEEK_SET); AMF::Object amf_storage; @@ -142,8 +143,9 @@ namespace Mist{ if (!tmpTag.getDataLen() || (tmpTag.needsInitData() && tmpTag.isInitData())){ return getNext(idx); } - size_t tNumber = meta.trackIDToIndex(tmpTag.getTrackID(), getpid()); - thisPacket.genericFill(tmpTag.tagTime(), tmpTag.offset(), tNumber, tmpTag.getData(), + thisIdx = meta.trackIDToIndex(tmpTag.getTrackID(), getpid()); + thisTime = tmpTag.tagTime(); + thisPacket.genericFill(thisTime, tmpTag.offset(), thisIdx, tmpTag.getData(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); if (M.getCodec(idx) == "PCM" && M.getSize(idx) == 16){ diff --git a/src/input/input_h264.cpp b/src/input/input_h264.cpp index 7b410e88..b9c72e47 100644 --- a/src/input/input_h264.cpp +++ b/src/input/input_h264.cpp @@ -122,6 +122,8 @@ namespace Mist{ if (M.getFpks(tNumber)){ts = frameCount * (1000000 / M.getFpks(tNumber));} thisPacket.genericFill(ts, 0, tNumber, 0, 0, 0, h264::isKeyframe(NAL.data(), nalSize)); thisPacket.appendNal(NAL.data(), nalSize); + thisTime = ts; + thisIdx = tNumber; ++frameCount; return; } diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 8e87623f..f58f41b3 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -629,7 +629,7 @@ namespace Mist{ FAIL_MSG("Failed to load HLS playlist, aborting"); return; } - meta.reInit(config->getString("streamname"), false); + meta.reInit(isSingular() ? streamName : "", false); INFO_MSG("Parsing live stream to create header..."); TS::Packet packet; // to analyse and extract data int pidCounter = 1; @@ -758,7 +758,7 @@ namespace Mist{ char *data; size_t dataLen; bool hasPacket = false; - meta.reInit(config->getString("streamname"), true); + meta.reInit(isSingular() ? streamName : ""); tthread::lock_guard guard(entryMutex); for (std::map >::iterator pListIt = listEntries.begin(); @@ -832,7 +832,6 @@ namespace Mist{ // Finally save the offset as part of the TS segment. This is required for bufferframe // to work correctly, since not every segment might have an UTC timestamp tag std::deque &curList = listEntries[pListIt->first]; - INFO_MSG("Saving offset of '%" PRId64 "' to current TS segment", plsTimeOffset[pListIt->first]); curList.at(entId-1).timeOffset = plsTimeOffset[pListIt->first]; } } @@ -852,6 +851,7 @@ namespace Mist{ } bool inputHLS::needsLock(){ + if (config->getBool("realtime")){return false;} if (isLiveDVR){ return true; } @@ -1013,6 +1013,8 @@ namespace Mist{ // overwrite trackId on success Bit::htobl(thisPacket.getData() + 8, tid); Bit::htobll(thisPacket.getData() + 12, packetTime); + thisTime = packetTime; + thisIdx = tid; return; // Success! } @@ -1071,6 +1073,7 @@ namespace Mist{ // Note: bpos is overloaded here for playlist entry! void inputHLS::seek(uint64_t seekTime, size_t idx){ + if (idx == INVALID_TRACK_ID){return;} plsTimeOffset.clear(); plsLastTime.clear(); plsInterval.clear(); diff --git a/src/input/input_ismv.cpp b/src/input/input_ismv.cpp index 3acea387..4e5a5cfa 100644 --- a/src/input/input_ismv.cpp +++ b/src/input/input_ismv.cpp @@ -108,6 +108,8 @@ namespace Mist{ thisPacket.genericFill(thisPos.time / 10000, thisPos.offset / 10000, thisPos.trackId, dataPointer, thisPos.size, 0, thisPos.isKeyFrame); + thisTime = thisPos.time/1000; + thisIdx = thisPos.trackId; if (buffered.size() < 2 * (idx == INVALID_TRACK_ID ? M.getValidTracks().size() : 1)){ std::set validTracks = M.getValidTracks(); diff --git a/src/input/input_mp3.cpp b/src/input/input_mp3.cpp index cbe75a5a..6bc656dd 100644 --- a/src/input/input_mp3.cpp +++ b/src/input/input_mp3.cpp @@ -51,7 +51,7 @@ namespace Mist{ bool inputMP3::readHeader(){ if (!inFile){return false;} - meta.reInit(config->getString("streamname")); + meta.reInit(isSingular() ? streamName : ""); size_t tNum = meta.addTrack(); meta.setID(tNum, tNum); meta.setType(tNum, "audio"); @@ -142,13 +142,16 @@ namespace Mist{ fseek(inFile, filePos + dataSize, SEEK_SET); // Create a json value with the right data - thisPacket.genericFill(timestamp, 0, idx, packHeader, dataSize, filePos, false); + thisPacket.genericFill(timestamp, 0, 0, packHeader, dataSize, filePos, false); + thisTime = timestamp; + thisIdx = 0; // Update the internal timestamp timestamp += (sampleCount / (sampleRate / 1000)); } void inputMP3::seek(uint64_t seekTime, size_t idx){ + idx = 0; DTSC::Keys keys(M.keys(idx)); uint32_t keyNum = M.getKeyNumForTime(idx, seekTime); fseek(inFile, keys.getBpos(keyNum), SEEK_SET); diff --git a/src/input/input_mp4.cpp b/src/input/input_mp4.cpp index 45dcd19a..90764108 100644 --- a/src/input/input_mp4.cpp +++ b/src/input/input_mp4.cpp @@ -209,6 +209,7 @@ namespace Mist{ INFO_MSG("inFile failed!"); return false; } + if (readExistingHeader()){return true;} // first we get the necessary header parts size_t tNumber = 0; @@ -238,7 +239,7 @@ namespace Mist{ if (readExistingHeader()){return true;} HIGH_MSG("Not read existing header"); - meta.reInit(streamName); + meta.reInit(isSingular() ? streamName : ""); tNumber = 0; // Create header file from MP4 data @@ -508,6 +509,8 @@ namespace Mist{ }else{ thisPacket.genericFill(curPart.time, curPart.offset, curPart.trackID, data, curPart.size, 0, isKeyframe); } + thisTime = curPart.time; + thisIdx = curPart.trackID; // get the next part for this track curPart.index++; diff --git a/src/input/input_playlist.cpp b/src/input/input_playlist.cpp index 2d6e8928..7e132040 100644 --- a/src/input/input_playlist.cpp +++ b/src/input/input_playlist.cpp @@ -41,8 +41,14 @@ namespace Mist{ void inputPlaylist::streamMainLoop(){ bool seenValidEntry = true; - uint64_t startTime = Util::bootMS(); + Comms::Users killSwitch; + killSwitch.reload(streamName, (size_t)INVALID_TRACK_ID, (uint8_t)(COMM_STATUS_ACTIVE | COMM_STATUS_DONOTTRACK)); while (config->is_active){ + if (killSwitch && killSwitch.getStatus() & COMM_STATUS_REQDISCONNECT){ + Util::logExitReason("buffer requested shutdown"); + config->is_active = false; + break; + } struct tm *wTime; time_t nowTime = time(0); wTime = localtime(&nowTime); @@ -77,7 +83,6 @@ namespace Mist{ std::map overrides; overrides["realtime"] = "1"; overrides["alwaysStart"] = ""; // Just making this value "available" is enough - overrides["simulated-starttime"] = JSON::Value(startTime).asString(); std::string srcPath = config->getString("input"); if ((currentSource.size() && currentSource[0] == '/') || srcPath.rfind('/') == std::string::npos){ srcPath = currentSource; @@ -106,6 +111,11 @@ namespace Mist{ } seenValidEntry = true; while (Util::Procs::isRunning(spawn_pid) && config->is_active){ + if (killSwitch && killSwitch.getStatus() & COMM_STATUS_REQDISCONNECT){ + Util::logExitReason("buffer requested shutdown"); + config->is_active = false; + break; + } Util::sleep(1000); if (reloadOn != 0xFFFF){ time_t nowTime = time(0); diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index e5467b18..feeb2d6a 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -344,7 +344,7 @@ namespace Mist{ ///\todo Find errors, perhaps parts can be made more modular bool inputTS::readHeader(){ if (!inFile){return false;} - meta.reInit(streamName); + meta.reInit(isSingular() ? streamName : ""); TS::Packet packet; // to analyse and extract data DTSC::Packet headerPack; fseek(inFile, 0, SEEK_SET); // seek to beginning @@ -430,7 +430,8 @@ namespace Mist{ return; } tsStream.initializeMetadata(meta); - size_t thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid()); + thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid()); + thisTime = thisPacket.getTime(); if (thisIdx == INVALID_TRACK_ID){getNext(idx);} } diff --git a/src/io.cpp b/src/io.cpp index d3d94827..5d6db506 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -37,15 +37,15 @@ namespace Mist{ /// Buffering itself is done by bufferNext(). ///\param tid The trackid of the page to start buffering ///\param pageNumber The number of the page to start buffering - bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page){ + bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta){ VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %" PRIu32, streamName.c_str(), idx, pageNumber); // Initialize the stream metadata if it does not yet exist #ifndef TSLIVE_INPUT - if (!meta){meta.reInit(streamName);} + if (!aMeta){aMeta.reInit(streamName);} #endif - if (!meta.getValidTracks().size()){ - meta.clear(); + if (!aMeta.getValidTracks().size()){ + aMeta.clear(); return false; } @@ -56,7 +56,7 @@ namespace Mist{ page.close(); } - Util::RelAccX &tPages = meta.pages(idx); + Util::RelAccX &tPages = aMeta.pages(idx); uint32_t pageIdx = INVALID_KEY_NUM; for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ @@ -79,7 +79,7 @@ namespace Mist{ } // If the page is already buffered, ignore this request - if (isBuffered(idx, pageNumber)){ + if (isBuffered(idx, pageNumber, aMeta)){ INFO_MSG("Page %" PRIu32 " on track %zu already buffered", pageNumber, idx); ///\return false if the page was already buffered. return false; @@ -171,16 +171,16 @@ namespace Mist{ /// Checks whether a key is buffered ///\param tid The trackid on which to locate the key ///\param keyNum The number of the keyframe to find - bool InOutBase::isBuffered(size_t idx, uint32_t keyNum){ + bool InOutBase::isBuffered(size_t idx, uint32_t keyNum, DTSC::Meta & aMeta){ ///\return The result of bufferedOnPage(tid, keyNum) - return bufferedOnPage(idx, keyNum) != INVALID_KEY_NUM; + return bufferedOnPage(idx, keyNum, aMeta) != INVALID_KEY_NUM; } /// Returns the pagenumber where this key is buffered on ///\param tid The trackid on which to locate the key ///\param keyNum The number of the keyframe to find - uint32_t InOutBase::bufferedOnPage(size_t idx, uint32_t keyNum){ - Util::RelAccX &tPages = meta.pages(idx); + uint32_t InOutBase::bufferedOnPage(size_t idx, uint32_t keyNum, DTSC::Meta & aMeta){ + Util::RelAccX &tPages = aMeta.pages(idx); for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ uint64_t pageNum = tPages.getInt("firstkey", i); @@ -198,6 +198,13 @@ namespace Mist{ ///\param pack The packet to buffer void InOutBase::bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page){ + bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, page, meta); + } + + /// Buffers the next packet on the currently opened page + ///\param pack The packet to buffer + void InOutBase::bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, + size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page, DTSC::Meta & aMeta){ size_t packDataLen = 24 + (packOffset ? 17 : 0) + (packBytePos ? 15 : 0) + (isKeyframe ? 19 : 0) + packDataSize + 11; @@ -209,10 +216,10 @@ namespace Mist{ } // these checks were already done in bufferSinglePacket, but we check again just to be sure - if (!meta.getVod() && packTime < meta.getLastms(packTrack)){ + if (!aMeta.getVod() && packTime < aMeta.getLastms(packTrack)){ DEBUG_MSG(((multiWrong == 0) ? DLVL_WARN : DLVL_HIGH), "Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack, - packTime, meta.getLastms(packTrack)); + packTime, aMeta.getLastms(packTrack)); multiWrong = true; return; } @@ -223,7 +230,7 @@ namespace Mist{ } multiWrong = false; - Util::RelAccX &tPages = meta.pages(packTrack); + Util::RelAccX &tPages = aMeta.pages(packTrack); uint32_t pageIdx = 0; uint32_t currPagNum = atoi(page.name.data() + page.name.rfind('_') + 1); Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey"); @@ -286,9 +293,7 @@ namespace Mist{ } /// Wraps up the buffering of a shared memory data page - /// - /// Registers the data page on the track index page as well - ///\param tid The trackid of the page to finalize + /// \param idx The track index of the page to finalize void InOutBase::bufferFinalize(size_t idx, IPC::sharedPage & page){ // If no page is open, do nothing if (!page){ @@ -318,11 +323,7 @@ namespace Mist{ } /// Buffers a live packet to a page. - /// - /// Handles both buffering and creation of new pages - /// - /// Initiates/continues negotiation with the buffer as well - ///\param packet The packet to buffer + /// Calls bufferLivePacket with full arguments internally. void InOutBase::bufferLivePacket(const DTSC::Packet &packet){ size_t idx = M.trackIDToIndex(packet.getTrackId(), getpid()); if (idx == INVALID_TRACK_ID){ @@ -337,19 +338,28 @@ namespace Mist{ /// \TODO META Build something that should actually be able to deal with "extra" values } + /// Calls bufferLivePacket with additional argument for internal metadata reference internally. void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe){ - meta.reloadReplacedPagesIfNeeded(); - meta.setLive(true); + bufferLivePacket(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, meta); + } + + ///Buffers the given packet data into the given metadata structure. + ///Uses class member variables livePage and curPageNum internally for bookkeeping. + ///These member variables are not (and should not, in the future) be accessed anywhere else. + void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, + size_t packDataSize, uint64_t packBytePos, bool isKeyframe, DTSC::Meta &aMeta){ + aMeta.reloadReplacedPagesIfNeeded(); + aMeta.setLive(true); // Store the trackid for easier access // Do nothing if the trackid is invalid if (packTrack == INVALID_TRACK_ID){return;} // Store the trackid for easier access - Util::RelAccX &tPages = meta.pages(packTrack); + Util::RelAccX &tPages = aMeta.pages(packTrack); - if (M.getType(packTrack) != "video"){ + if (aMeta.getType(packTrack) != "video"){ isKeyframe = false; if (!tPages.getEndPos() || !livePage[packTrack]){ // Assume this is the first packet on the track @@ -363,20 +373,20 @@ namespace Mist{ // For live streams, ignore packets that make no sense // This also happens in bufferNext, with the same rules - if (M.getLive()){ - if (packTime < M.getLastms(packTrack)){ + if (aMeta.getLive()){ + if (packTime < aMeta.getLastms(packTrack)){ HIGH_MSG("Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack, - packTime, M.getLastms(packTrack)); + packTime, aMeta.getLastms(packTrack)); return; } - if (packTime > M.getLastms(packTrack) + 30000 && M.getLastms(packTrack)){ - WARN_MSG("Sudden jump in timestamp from %" PRIu64 " to %" PRIu64, M.getLastms(packTrack), packTime); + if (packTime > aMeta.getLastms(packTrack) + 30000 && aMeta.getLastms(packTrack)){ + WARN_MSG("Sudden jump in timestamp from %" PRIu64 " to %" PRIu64, aMeta.getLastms(packTrack), packTime); } } // Determine if we need to open the next page if (isKeyframe){ - updateTrackFromKeyframe(packTrack, packData, packDataSize); + updateTrackFromKeyframe(packTrack, packData, packDataSize, aMeta); uint64_t endPage = tPages.getEndPos(); size_t curPage = 0; size_t currPagNum = atoi(livePage[packTrack].name.data() + livePage[packTrack].name.rfind('_') + 1); @@ -390,15 +400,15 @@ namespace Mist{ // If there is no page, create it if (!livePage[packTrack]){ - size_t keyNum = M.getKeyNumForTime(packTrack, packTime); + size_t keyNum = aMeta.getKeyNumForTime(packTrack, packTime); if (keyNum == INVALID_KEY_NUM){ curPageNum[packTrack] = 0; }else{ - curPageNum[packTrack] = M.getKeyNumForTime(packTrack, packTime) + 1; + curPageNum[packTrack] = aMeta.getKeyNumForTime(packTrack, packTime) + 1; } if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){ - meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); + aMeta.resizeTrack(packTrack, aMeta.fragments(packTrack).getRCount(), aMeta.keys(packTrack).getRCount(), aMeta.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); } tPages.addRecords(1); @@ -409,7 +419,7 @@ namespace Mist{ tPages.setInt("avail", 0, endPage); curPage = endPage; DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack); - if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack])){ + if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack], aMeta)){ // if this fails, return instantly without actually buffering the packet WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); return; @@ -424,7 +434,7 @@ namespace Mist{ tPages.getInt("firstkey", curPage), packTrack, curPageNum[packTrack]); if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){ - meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); + aMeta.resizeTrack(packTrack, aMeta.fragments(packTrack).getRCount(), aMeta.keys(packTrack).getRCount(), aMeta.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages"); } tPages.addRecords(1); @@ -436,7 +446,7 @@ namespace Mist{ curPage = endPage; if (livePage[packTrack]){bufferFinalize(packTrack, livePage[packTrack]);} DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack); - if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack])){ + if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack], aMeta)){ // if this fails, return instantly without actually buffering the packet WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime); return; @@ -462,13 +472,13 @@ namespace Mist{ // Buffer the packet DONTEVEN_MSG("Buffering live packet (%zuB) @%" PRIu64 " ms on track %" PRIu32 " with offset %" PRIu64, packDataSize, packTime, packTrack, packOffset); - bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, livePage[packTrack]); - meta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe); + bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, livePage[packTrack], aMeta); + aMeta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe); } ///Handles updating track metadata from a new keyframe, if applicable - void InOutBase::updateTrackFromKeyframe(uint32_t packTrack, const char *packData, size_t packDataSize){ - if (meta.getCodec(packTrack) == "H264"){ + void InOutBase::updateTrackFromKeyframe(uint32_t packTrack, const char *packData, size_t packDataSize, DTSC::Meta & aMeta){ + if (aMeta.getCodec(packTrack) == "H264"){ //H264 packets are 4-byte size-prepended NAL units size_t offset = 0; while (offset+4 < packDataSize){ @@ -480,14 +490,14 @@ namespace Mist{ uint8_t nalType = (packData[offset+4] & 0x1F); if (nalType == 7){//SPS, update width/height/FPS h264::SPSMeta hMeta = h264::sequenceParameterSet(packData+offset+4, nalLen).getCharacteristics(); - meta.setWidth(packTrack, hMeta.width); - meta.setHeight(packTrack, hMeta.height); - meta.setFpks(packTrack, hMeta.fps*1000); + aMeta.setWidth(packTrack, hMeta.width); + aMeta.setHeight(packTrack, hMeta.height); + aMeta.setFpks(packTrack, hMeta.fps*1000); } offset += nalLen+4; } } - if (meta.getCodec(packTrack) == "VP8"){ + if (aMeta.getCodec(packTrack) == "VP8"){ //VP8 packets have a simple header for keyframes //Reference: https://www.rfc-editor.org/rfc/rfc6386.html#section-9.1 if (packData[3] == 0x9d && packData[4] == 0x01 && packData[5] == 0x2a){ @@ -506,8 +516,8 @@ namespace Mist{ case 2: h *= 5/3; break; case 3: h *= 2; break; } - meta.setWidth(packTrack, w); - meta.setHeight(packTrack, h); + aMeta.setWidth(packTrack, w); + aMeta.setHeight(packTrack, h); } } } diff --git a/src/io.h b/src/io.h index e0652efd..d839952a 100644 --- a/src/io.h +++ b/src/io.h @@ -14,27 +14,33 @@ namespace Mist{ public: InOutBase(); - bool isBuffered(size_t idx, uint32_t keyNum); - uint32_t bufferedOnPage(size_t idx, uint32_t keyNum); + bool isBuffered(size_t idx, uint32_t keyNum, DTSC::Meta & aMeta); + uint32_t bufferedOnPage(size_t idx, uint32_t keyNum, DTSC::Meta & aMeta); size_t getMainSelectedTrack(); - bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page); + bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta); void bufferFinalize(size_t idx, IPC::sharedPage & page); bool isCurrentLivePage(size_t idx, uint32_t pageNumber); void bufferRemove(size_t idx, uint32_t pageNumber); void bufferLivePacket(const DTSC::Packet &packet); + void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, + size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page, DTSC::Meta & aMeta); void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page); void bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe); + void bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, + size_t packDataSize, uint64_t packBytePos, bool isKeyframe, DTSC::Meta & aMeta); protected: - void updateTrackFromKeyframe(uint32_t packTrack, const char *packData, size_t packDataSize); + void updateTrackFromKeyframe(uint32_t packTrack, const char *packData, size_t packDataSize, DTSC::Meta & aMeta); bool standAlone; DTSC::Packet thisPacket; // The current packet that is being parsed + size_t thisIdx; //Track index of current packet + uint64_t thisTime; //Time of current packet std::string streamName; diff --git a/src/output/output.h b/src/output/output.h index 335c0b14..5ec379f8 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -155,9 +155,6 @@ namespace Mist{ uint64_t firstPacketTime; uint64_t lastPacketTime; - size_t thisIdx; - uint64_t thisTime; - std::map curPage; ///< For each track, holds the page that is currently being written. };