From 1f4b523b1b73c474cd1f1e3781e5ad81771b2010 Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Tue, 7 Apr 2015 14:00:08 +0200 Subject: [PATCH] Initial TS Input commit --- CMakeLists.txt | 23 +- Makefile | 6 + lib/adts.cpp | 123 +++++++ lib/adts.h | 25 ++ lib/bitstream.cpp | 2 +- lib/bitstream.h | 2 +- lib/dtsc.h | 2 +- lib/dtscmeta.cpp | 16 +- lib/h264.cpp | 71 ++++ lib/h264.h | 4 + lib/nal.cpp | 120 +++++++ lib/nal.h | 18 + lib/procs.cpp | 25 ++ lib/procs.h | 4 + lib/shared_memory.cpp | 39 +++ lib/shared_memory.h | 11 + lib/ts_packet.cpp | 20 +- lib/ts_packet.h | 3 + lib/ts_stream.cpp | 324 +++++++++++++++++ lib/ts_stream.h | 37 ++ src/analysers/tsstream_analyser.cpp | 54 +++ src/controller/controller_api.cpp | 6 +- src/controller/controller_streams.cpp | 44 ++- src/controller/controller_streams.h | 2 + src/input/input.cpp | 302 +++++++++------- src/input/input.h | 8 +- src/input/input_buffer.cpp | 59 ++-- src/input/input_ts.cpp | 477 ++++---------------------- src/input/input_ts.h | 62 +--- src/input/mist_in.cpp | 9 + src/io.cpp | 30 +- src/io.h | 4 +- src/output/output.cpp | 11 +- 33 files changed, 1300 insertions(+), 643 deletions(-) create mode 100644 lib/adts.cpp create mode 100644 lib/adts.h create mode 100644 lib/h264.cpp create mode 100644 lib/h264.h create mode 100644 lib/ts_stream.cpp create mode 100644 lib/ts_stream.h create mode 100755 src/analysers/tsstream_analyser.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 366f9859..76a40949 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -113,6 +113,7 @@ add_definitions(-g -funsigned-char -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 -D # MistLib - Header Files # ######################################## set(libHeaders + ${SOURCE_DIR}/lib/adts.h ${SOURCE_DIR}/lib/amf.h ${SOURCE_DIR}/lib/auth.h ${SOURCE_DIR}/lib/base64.h @@ -124,6 +125,7 @@ set(libHeaders ${SOURCE_DIR}/lib/dtsc.h ${SOURCE_DIR}/lib/encryption.h ${SOURCE_DIR}/lib/flv_tag.h + ${SOURCE_DIR}/lib/h264.h ${SOURCE_DIR}/lib/http_parser.h ${SOURCE_DIR}/lib/json.h ${SOURCE_DIR}/lib/mp4_adobe.h @@ -146,6 +148,7 @@ set(libHeaders ${SOURCE_DIR}/lib/timing.h ${SOURCE_DIR}/lib/tinythread.h ${SOURCE_DIR}/lib/ts_packet.h + ${SOURCE_DIR}/lib/ts_stream.h ${SOURCE_DIR}/lib/vorbis.h ) @@ -153,6 +156,7 @@ set(libHeaders # MistLib - Source Files # ######################################## set(libSources + ${SOURCE_DIR}/lib/adts.cpp ${SOURCE_DIR}/lib/amf.cpp ${SOURCE_DIR}/lib/auth.cpp ${SOURCE_DIR}/lib/base64.cpp @@ -163,6 +167,7 @@ set(libSources ${SOURCE_DIR}/lib/dtscmeta.cpp ${SOURCE_DIR}/lib/encryption.cpp ${SOURCE_DIR}/lib/flv_tag.cpp + ${SOURCE_DIR}/lib/h264.cpp ${SOURCE_DIR}/lib/http_parser.cpp ${SOURCE_DIR}/lib/json.cpp ${SOURCE_DIR}/lib/mp4_adobe.cpp @@ -184,6 +189,7 @@ set(libSources ${SOURCE_DIR}/lib/timing.cpp ${SOURCE_DIR}/lib/tinythread.cpp ${SOURCE_DIR}/lib/ts_packet.cpp + ${SOURCE_DIR}/lib/ts_stream.cpp ${SOURCE_DIR}/lib/vorbis.cpp ) ######################################## @@ -243,6 +249,8 @@ makeAnalyser(MP4 mp4) makeAnalyser(OGG ogg) makeAnalyser(RTP rtp) #LTS makeAnalyser(RTSP rtsp_rtp) #LTS +makeAnalyser(TS ts) #LTS +makeAnalyser(TSStream tsstream) #LTS makeAnalyser(Stats stats) #LTS ######################################## @@ -260,9 +268,21 @@ macro(makeInput inputName format) src/input/input_${format}.cpp src/io.cpp ) + + #Set compile definitions + unset(my_definitions) + if (";${ARGN};" MATCHES ";nolock;")#Currently only used in TSStream + list(APPEND my_definitions "INPUT_NOLOCK") + endif() + if (";${ARGN};" MATCHES ";live;")#Currently only used in TSStream + list(APPEND my_definitions "INPUT_LIVE") + endif() + list(APPEND my_definitions "INPUTTYPE=\"input_${format}.h\"") + set_target_properties(MistIn${inputName} - PROPERTIES COMPILE_DEFINITIONS INPUTTYPE=\"input_${format}.h\" + PROPERTIES COMPILE_DEFINITIONS "${my_definitions}" ) + target_link_libraries(MistIn${inputName} mist ) @@ -281,6 +301,7 @@ makeInput(Buffer buffer) makeInput(ISMV ismv)#LTS makeInput(MP4 mp4)#LTS makeInput(TS ts)#LTS +makeInput(TSStream ts nolock live)#LTS makeInput(Folder folder folder)#LTS ######################################## diff --git a/Makefile b/Makefile index 6e4e8211..c07235d9 100644 --- a/Makefile +++ b/Makefile @@ -152,6 +152,12 @@ MistInTS: override CPPFLAGS += "-DINPUTTYPE=\"input_ts.h\"" MistInTS: src/input/mist_in.cpp src/input/input.cpp src/input/input_ts.cpp src/io.cpp $(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@ +inputs: MistInTSLive +MistInTSLive: override LDLIBS += $(THREADLIB) +MistInTSLive: override CPPFLAGS += "-DINPUTTYPE=\"input_tslive.h\"" +MistInTSLive: src/input/mist_in.cpp src/input/input.cpp src/input/input_tslive.cpp src/io.cpp + $(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@ + inputs: MistInMP4 MistInMP4: override LDLIBS += $(THREADLIB) MistInMP4: override CPPFLAGS += "-DINPUTTYPE=\"input_mp4.h\"" diff --git a/lib/adts.cpp b/lib/adts.cpp new file mode 100644 index 00000000..21af9707 --- /dev/null +++ b/lib/adts.cpp @@ -0,0 +1,123 @@ +#include "adts.h" +#include +#include + +#include + +#include "defines.h" + +namespace aac { + adts::adts(){ + data = NULL; + len = 0; + } + + adts::adts(char * _data, unsigned long _len){ + len = _len; + data = (char*)malloc(len); + memcpy(data, _data, len); + } + + adts::adts(const adts & rhs){ + *this = rhs; + } + + adts& adts::operator = (const adts & rhs){ + len = rhs.len; + data = (char*)malloc(len); + memcpy(data, rhs.data, len); + return * this; + } + + adts::~adts(){ + if (data){ + free(data); + } + } + + unsigned long adts::getAACProfile(){ + if (!data || !len){ + return 0; + } + return ((data[2] >> 6) & 0x03) + 1; + } + + unsigned long adts::getFrequencyIndex(){ + if (!data || !len){ + return 0; + } + return ((data[2] >> 2) & 0x0F); + + } + + unsigned long adts::getFrequency(){ + if (!data || !len){ + return 0; + } + switch(getFrequencyIndex()){ + case 0: return 96000; break; + case 1: return 88200; break; + case 2: return 64000; break; + case 3: return 48000; break; + case 4: return 44100; break; + case 5: return 32000; break; + case 6: return 24000; break; + case 7: return 22050; break; + case 8: return 16000; break; + case 9: return 12000; break; + case 10: return 11025; break; + case 11: return 8000; break; + case 12: return 7350; break; + default: return 0; break; + } + } + + unsigned long adts::getChannelConfig(){ + if (!data || !len){ + return 0; + } + return ((data[2] & 0x01) << 2) | ((data[3] >> 6) & 0x03); + } + + unsigned long adts::getChannelCount(){ + if (!data || !len){ + return 0; + } + return (getChannelConfig() == 7 ? 8 : getChannelConfig()); + } + + unsigned long adts::getHeaderSize(){ + if (!data || !len){ + return 0; + } + return (data[1] & 0x01 ? 7 : 9); + } + + unsigned long adts::getPayloadSize(){ + if (!data || !len){ + return 0; + } + return (((data[3] & 0x03) << 11) | (data[4] << 3) | ((data[5] >> 5) & 0x07)) - getHeaderSize(); + } + + unsigned long adts::getSampleCount(){ + if (!data || !len){ + return 0; + } + return ((data[6] & 0x03) + 1) * 1024;//Number of samples in this frame * 1024 + } + + char * adts::getPayload() { + if (!data || !len){ + return 0; + } + return data + getHeaderSize(); + } + std::string adts::toPrettyString(){ + std::stringstream res; + res << "SyncWord: " << std::hex << (((int)data[0] << 4) | ((data[1] >> 4) & 0x0F)) << std::endl; + res << "HeaderSize: " << std::dec << getHeaderSize() << std::endl; + res << "PayloadSize: " << std::dec << getPayloadSize() << std::endl; + return res.str(); + } +} diff --git a/lib/adts.h b/lib/adts.h new file mode 100644 index 00000000..533460d1 --- /dev/null +++ b/lib/adts.h @@ -0,0 +1,25 @@ +#include + +namespace aac { + class adts { + public: + adts(); + adts(char * _data, unsigned long _len); + adts(const adts & rhs); + ~adts(); + adts& operator = (const adts & rhs); + unsigned long getAACProfile(); + unsigned long getFrequencyIndex(); + unsigned long getFrequency(); + unsigned long getChannelConfig(); + unsigned long getChannelCount(); + unsigned long getHeaderSize(); + unsigned long getPayloadSize(); + unsigned long getSampleCount(); + char * getPayload(); + std::string toPrettyString(); + private: + char * data; + unsigned long len; + }; +} diff --git a/lib/bitstream.cpp b/lib/bitstream.cpp index ef513913..c62568c7 100644 --- a/lib/bitstream.cpp +++ b/lib/bitstream.cpp @@ -26,7 +26,7 @@ namespace Utils { } } - void bitstream::append(char * input, size_t bytes) { + void bitstream::append(const char * input, size_t bytes) { if (checkBufferSize(dataSize + bytes)) { memcpy(data + dataSize, input, bytes); dataSize += bytes; diff --git a/lib/bitstream.h b/lib/bitstream.h index d34d6424..363620f4 100644 --- a/lib/bitstream.h +++ b/lib/bitstream.h @@ -12,7 +12,7 @@ namespace Utils { append(std::string(input, 1)); return *this; }; - void append(char * input, size_t bytes); + void append(const char * input, size_t bytes); void append(std::string input); long long unsigned int size(); void skip(size_t count); diff --git a/lib/dtsc.h b/lib/dtsc.h index d0764bd7..5ed7f350 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -108,7 +108,7 @@ namespace DTSC { operator bool() const; packType getVersion() const; void reInit(const char * data_, unsigned int len, bool noCopy = false); - void genericFill(long long packTime, long long packOffset, long long packTrack, char * packData, long long packDataSize, long long packBytePos, bool isKeyframe); + void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, long long packBytePos, bool isKeyframe); void getString(const char * identifier, char *& result, unsigned int & len) const; void getString(const char * identifier, std::string & result) const; void getInt(const char * identifier, int & result) const; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index 49e357a7..795c2557 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -19,7 +19,10 @@ namespace DTSC { /// Copy constructor for packets, copies an existing packet with same noCopy flag as original. Packet::Packet(const Packet & rhs) { - Packet(rhs.data, rhs.dataLen, !rhs.master); + master = false; + bufferLen = 0; + data = NULL; + reInit(rhs.data, rhs.dataLen, !rhs.master); } /// Data constructor for packets, either references or copies a packet from raw data. @@ -112,7 +115,7 @@ namespace DTSC { ///\param noCopy Determines whether to make a copy or not void Packet::reInit(const char * data_, unsigned int len, bool noCopy) { if (!data_) { - DEBUG_MSG(DLVL_DEVEL, "ReInit received a null pointer with len %d, ignoring", len); + HIGH_MSG("ReInit received a null pointer with len %d, ignoring", len); null(); return; } @@ -168,7 +171,8 @@ namespace DTSC { } /// Re-initializes this Packet to contain a generic DTSC packet with the given data fields. - void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, char * packData, long long packDataSize, long long packBytePos, bool isKeyframe){ + /// When given a NULL pointer, the data is reserved and memset to 0 + void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, long long packBytePos, bool isKeyframe){ null(); master = true; //time and trackID are part of the 20-byte header. @@ -217,7 +221,11 @@ namespace DTSC { memcpy(data+offset, "\000\004data\002", 7); tmpLong = htonl(packDataSize); memcpy(data+offset+7, (char *)&tmpLong, 4); - memcpy(data+offset+11, packData, packDataSize); + if (packData){ + memcpy(data+offset+11, packData, packDataSize); + }else{ + memset(data+offset+11, 0, packDataSize); + } //finish container with 0x0000EE memcpy(data+offset+11+packDataSize, "\000\000\356", 3); } diff --git a/lib/h264.cpp b/lib/h264.cpp new file mode 100644 index 00000000..646b05c0 --- /dev/null +++ b/lib/h264.cpp @@ -0,0 +1,71 @@ +#include "h264.h" +#include +#include +#include "bitfields.h" +#include "defines.h" + +namespace h264 { + unsigned long toAnnexB(const char * data, unsigned long dataSize, char *& result){ + //toAnnexB keeps the same size. + if (!result){ + result = (char *)malloc(dataSize); + } + int offset = 0; + while (offset < dataSize){ + //Read unit size + unsigned long unitSize = Bit::btohl(data + offset); + //Write annex b header + memset(result + offset, 0x00, 3); + result[offset + 3] = 0x01; + //Copy the nal unit + memcpy(result + offset + 4, data + offset + 4, unitSize); + //Update the offset + offset += 4 + unitSize; + } + return dataSize; + } + + unsigned long fromAnnexB(const char * data, unsigned long dataSize, char *& result){ + if (!result){ + //first compute the new size. This might be the same as the annex b version, but this is not guaranteed + int offset = 0; + int newSize = 0; + while (offset < dataSize){ + const char * begin = (const char*)memmem(data + offset, dataSize - offset, "\000\000\001", 3); + begin += 3;//Initialize begin after the first 0x000001 pattern. + const char * end = (const char*)memmem(begin, dataSize - (begin - data), "\000\000\001", 3); + if (end - data > dataSize){ + end = data + dataSize; + } + //Check for 4-byte lead in's. Yes, we access -1 here + if (end[-1] == 0x00){ + end--; + } + newSize += 4 + (end - begin);//end - begin = nalSize + offset = end - data; + } + result = (char *)malloc(newSize); + } + int offset = 0; + int newOffset = 0; + while (offset < dataSize){ + const char * begin = ((const char*)memmem(data + offset, dataSize - offset, "\000\000\001", 3)) + 3;//Initialize begin after the first 0x000001 pattern. + const char * end = (const char*)memmem(begin, dataSize - (begin - data), "\000\000\001", 3); + if (end - data > dataSize){ + end = data + dataSize; + } + //Check for 4-byte lead in's. Yes, we access -1 here + if (end[-1] == 0x00){ + end--; + } + unsigned int nalSize = end - begin; + Bit::htobl(result + newOffset, nalSize); + memcpy(result + newOffset + 4, begin, nalSize); + + newOffset += 4 + nalSize; + offset = end - data; + } + return newOffset; + } +} + diff --git a/lib/h264.h b/lib/h264.h new file mode 100644 index 00000000..7f0190c9 --- /dev/null +++ b/lib/h264.h @@ -0,0 +1,4 @@ +namespace h264 { + unsigned long toAnnexB(const char * data, unsigned long dataSize, char *& result); + unsigned long fromAnnexB(const char * data, unsigned long dataSize, char *& result); +} diff --git a/lib/nal.cpp b/lib/nal.cpp index 2274fa55..a4a2e547 100644 --- a/lib/nal.cpp +++ b/lib/nal.cpp @@ -21,6 +21,20 @@ namespace h264 { return result; } + std::deque analyseH264Packet(const char * data, unsigned long len){ + std::deque res; + + int offset = 0; + while (offset < len){ + nalData entry; + entry.nalSize = Bit::btohl(data + offset); + entry.nalType = (data + offset)[4] & 0x1F; + res.push_back(entry); + offset += entry.nalSize + 4; + } + return res; + } + ///empty constructor of NAL NAL::NAL() { @@ -518,5 +532,111 @@ namespace h264 { std::cout << "second_chroma_qp_index_offset: " << bs.getExpGolomb() << std::endl; } + sequenceParameterSet::sequenceParameterSet(const char * _data, unsigned long _dataLen) : data(_data), dataLen(_dataLen) {} + + SPSMeta sequenceParameterSet::getCharacteristics() const { + SPSMeta result; + + //For calculating width + unsigned int widthInMbs = 0; + unsigned int cropHorizontal = 0; + + //For calculating height + bool mbsOnlyFlag = 0; + unsigned int heightInMapUnits = 0; + unsigned int cropVertical = 0; + + //Fill the bitstream + Utils::bitstream bs; + for (unsigned int i = 1; i < dataLen; i++) { + if (i + 2 < dataLen && (memcmp(data + i, "\000\000\003", 3) == 0)){//Emulation prevention bytes + //Yes, we increase i here + bs.append(data + i, 2); + i += 2; + } else { + //No we don't increase i here + bs.append(data + i, 1); + } + } + + char profileIdc = bs.get(8); + //Start skipping unused data + bs.skip(16); + bs.getUExpGolomb(); + if (profileIdc == 100 || profileIdc == 110 || profileIdc == 122 || profileIdc == 244 || profileIdc == 44 || profileIdc == 83 || profileIdc == 86 || profileIdc == 118 || profileIdc == 128) { + //chroma format idc + if (bs.getUExpGolomb() == 3) { + bs.skip(1); + } + bs.getUExpGolomb(); + bs.getUExpGolomb(); + bs.skip(1); + if (bs.get(1)) { + DEBUG_MSG(DLVL_DEVEL, "Scaling matrix not implemented yet"); + } + } + bs.getUExpGolomb(); + unsigned int pic_order_cnt_type = bs.getUExpGolomb(); + if (!pic_order_cnt_type) { + bs.getUExpGolomb(); + } else if (pic_order_cnt_type == 1) { + DEBUG_MSG(DLVL_DEVEL, "This part of the implementation is incomplete(2), to be continued. If this message is shown, contact developers immediately."); + } + bs.getUExpGolomb(); + bs.skip(1); + //Stop skipping data and start doing usefull stuff + + + widthInMbs = bs.getUExpGolomb() + 1; + heightInMapUnits = bs.getUExpGolomb() + 1; + + mbsOnlyFlag = bs.get(1);//Gets used in height calculation + if (!mbsOnlyFlag) { + bs.skip(1); + } + bs.skip(1); + //cropping flag + if (bs.get(1)) { + cropHorizontal = bs.getUExpGolomb();//leftOffset + cropHorizontal += bs.getUExpGolomb();//rightOffset + cropVertical = bs.getUExpGolomb();//topOffset + cropVertical += bs.getUExpGolomb();//bottomOffset + } + + //vuiParameters + if (bs.get(1)) { + //Skipping all the paramters we dont use + if (bs.get(1)) { + if (bs.get(8) == 255) { + bs.skip(32); + } + } + if (bs.get(1)) { + bs.skip(1); + } + if (bs.get(1)) { + bs.skip(4); + if (bs.get(1)) { + bs.skip(24); + } + } + if (bs.get(1)) { + bs.getUExpGolomb(); + bs.getUExpGolomb(); + } + + //Decode timing info + if (bs.get(1)) { + unsigned int unitsInTick = bs.get(32); + unsigned int timeScale = bs.get(32); + result.fps = (double)timeScale / (2 * unitsInTick); + bs.skip(1); + } + } + + result.width = (widthInMbs * 16) - (cropHorizontal * 2); + result.height = ((mbsOnlyFlag ? 1 : 2) * heightInMapUnits * 16) - (cropVertical * 2); + return result; + } } diff --git a/lib/nal.h b/lib/nal.h index b06f4caf..ce14488f 100644 --- a/lib/nal.h +++ b/lib/nal.h @@ -1,9 +1,17 @@ +#pragma once +#include #include #include #include #include "dtsc.h" namespace h264 { + struct nalData { + unsigned char nalType; + unsigned long nalSize; + }; + + std::deque analyseH264Packet(const char * data, unsigned long len); std::deque parseNalSizes(DTSC::Packet & pack); ///Struct containing pre-calculated metadata of an SPS nal unit. Width and height in pixels, fps in Hz @@ -45,4 +53,14 @@ namespace h264 { PPS(std::string & InputData): NAL(InputData) {}; void analyzePPS(); }; + + + class sequenceParameterSet { + public: + sequenceParameterSet(const char * _data, unsigned long _dataLen); + SPSMeta getCharacteristics() const; + private: + const char * data; + unsigned long dataLen; + }; }//ns h264 diff --git a/lib/procs.cpp b/lib/procs.cpp index 9d51b3d4..afca81d2 100644 --- a/lib/procs.cpp +++ b/lib/procs.cpp @@ -201,6 +201,31 @@ std::string Util::Procs::getOutputOf(char * const * argv) { return ret; } + +///This function prepares a deque for getOutputOf and automatically inserts a NULL at the end of the char* const* +char* const* Util::Procs::dequeToArgv(std::deque & argDeq){ + char** ret = (char**)malloc((argDeq.size()+1)*sizeof(char*)); + for (int i = 0; i & argDeq){ + std::string ret; + char* const* argv = dequeToArgv(argDeq);//Note: Do not edit deque before executing command + ret = getOutputOf(argv); + return ret; +} + +pid_t Util::Procs::StartPiped(std::deque & argDeq, int * fdin, int * fdout, int * fderr) { + pid_t ret; + char* const* argv = dequeToArgv(argDeq);//Note: Do not edit deque before executing command + ret = Util::Procs::StartPiped(argv, fdin, fdout, fderr); + return ret; +} + /// Starts a new process with given fds if the name is not already active. /// \return 0 if process was not started, process PID otherwise. /// \arg argv Command for this process. diff --git a/lib/procs.h b/lib/procs.h index 4a06c5f0..d7f7637c 100644 --- a/lib/procs.h +++ b/lib/procs.h @@ -6,6 +6,7 @@ #include #include #include +#include /// Contains utility code, not directly related to streaming media namespace Util { @@ -19,9 +20,12 @@ namespace Util { static void exit_handler(); static void runCmd(std::string & cmd); static void setHandler(); + static char* const* dequeToArgv(std::deque & argDeq); public: static std::string getOutputOf(char * const * argv); + static std::string getOutputOf(std::deque & argDeq); static pid_t StartPiped(char * const * argv, int * fdin, int * fdout, int * fderr); + static pid_t StartPiped(std::deque & argDeq, int * fdin, int * fdout, int * fderr); static void Stop(pid_t name); static void Murder(pid_t name); static void StopAll(); diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index be6fe876..c6186ede 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -12,6 +12,7 @@ #include "shared_memory.h" #include "stream.h" #include "procs.h" +#include "bitfields.h" namespace IPC { @@ -1025,5 +1026,43 @@ namespace IPC { } return (myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0)); } + + userConnection::userConnection(char * _data) { + data = _data; + } + + unsigned long userConnection::getTrackId(size_t offset) const { + if (offset >= SIMUL_TRACKS){ + WARN_MSG("Trying to get track id for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS); + return 0; + } + return Bit::btohl(data + (offset * 6)); + } + + void userConnection::setTrackId(size_t offset, unsigned long trackId) const { + if (offset >= SIMUL_TRACKS){ + WARN_MSG("Trying to set track id for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS); + return; + } + Bit::htobl(data + (offset * 6), trackId); + + } + + unsigned long userConnection::getKeynum(size_t offset) const { + if (offset >= SIMUL_TRACKS){ + WARN_MSG("Trying to get keynum for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS); + return 0; + } + return Bit::btohs(data + (offset * 6) + 4); + } + + void userConnection::setKeynum(size_t offset, unsigned long keynum) { + if (offset >= SIMUL_TRACKS){ + WARN_MSG("Trying to set keynum for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS); + return; + } + Bit::htobs(data + (offset * 6) + 4, keynum); + + } } diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 81b2b162..8514919d 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -228,4 +228,15 @@ namespace IPC { ///\brief Whether the payload has a counter, if so, it is added in front of the payload bool hasCounter; }; + + class userConnection { + public: + userConnection(char * _data); + unsigned long getTrackId(size_t offset) const; + void setTrackId(size_t offset, unsigned long trackId) const; + unsigned long getKeynum(size_t offset) const; + void setKeynum(size_t offset, unsigned long keynum); + private: + char * data; + }; } diff --git a/lib/ts_packet.cpp b/lib/ts_packet.cpp index ddf5403c..d91e4255 100644 --- a/lib/ts_packet.cpp +++ b/lib/ts_packet.cpp @@ -25,6 +25,11 @@ namespace TS { clear(); } + Packet::Packet(const Packet & rhs){ + memcpy(strBuf, rhs.strBuf, 188); + pos = 188; + } + /// This function fills a Packet from a file. /// It fills the content with the next 188 bytes int he file. /// \param Data The data to be read into the packet. @@ -34,11 +39,11 @@ namespace TS { if (!fread((void *)strBuf, 188, 1, data)) { return false; } - pos=188; if (strBuf[0] != 0x47){ INFO_MSG("Failed to read a good packet on pos %lld", pos); return false; } + pos=188; return true; } @@ -416,7 +421,7 @@ namespace TS { /// \return A character pointer to the internal packet buffer data const char * Packet::checkAndGetBuffer() const{ if (pos != 188) { - DEBUG_MSG(DLVL_ERROR, "Size invalid (%d) - invalid data from this point on", pos); + DEBUG_MSG(DLVL_HIGH, "Size invalid (%d) - invalid data from this point on", pos); } return strBuf; } @@ -568,6 +573,11 @@ namespace TS { } + ProgramAssociationTable & ProgramAssociationTable::operator = (const Packet & rhs){ + memcpy(strBuf, rhs.checkAndGetBuffer(), 188); + pos = 188; + return *this; + } ///Retrieves the current addStuffingoffset value for a PAT char ProgramAssociationTable::getOffset() const{ unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0); @@ -753,6 +763,12 @@ namespace TS { pos=4; } + ProgramMappingTable & ProgramMappingTable::operator = (const Packet & rhs) { + memcpy(strBuf, rhs.checkAndGetBuffer(), 188); + pos = 188; + return *this; + } + char ProgramMappingTable::getOffset() const{ unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0); return strBuf[loc]; diff --git a/lib/ts_packet.h b/lib/ts_packet.h index 7c0d370c..6faa6e9c 100644 --- a/lib/ts_packet.h +++ b/lib/ts_packet.h @@ -22,6 +22,7 @@ namespace TS { public: //Constructors and fillers Packet(); + Packet(const Packet & rhs); ~Packet(); bool FromPointer(const char * data); bool FromFile(FILE * data); @@ -83,6 +84,7 @@ namespace TS { class ProgramAssociationTable : public Packet { public: + ProgramAssociationTable & operator = (const Packet & rhs); char getOffset() const; char getTableId() const; short getSectionLength() const; @@ -119,6 +121,7 @@ namespace TS { class ProgramMappingTable : public Packet { public: ProgramMappingTable(); + ProgramMappingTable & operator = (const Packet & rhs); char getOffset() const; void setOffset(char newVal); char getTableId() const; diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp new file mode 100644 index 00000000..650d2f18 --- /dev/null +++ b/lib/ts_stream.cpp @@ -0,0 +1,324 @@ +#include "ts_stream.h" +#include "defines.h" +#include "h264.h" +#include "nal.h" +#include "mp4_generic.h" + +namespace TS { + void Stream::parse(char * newPack, unsigned long long bytePos) { + Packet newPacket; + newPacket.FromPointer(newPack); + parse(newPacket, bytePos); + } + + void Stream::clear(){ + pesStreams.clear(); + pesPositions.clear(); + payloadSize.clear(); + outPackets.clear(); + } + + void Stream::parse(Packet & newPack, unsigned long long bytePos) { + int tid = newPack.getPID(); + if (tid == 0){ + associationTable = newPack; + return; + } + //If we are here, the packet is not a PAT. + //First check if it is listed in the PAT as a PMT track. + int pmtCount = associationTable.getProgramCount(); + for (int i = 0; i < pmtCount; i++){ + if (tid == associationTable.getProgramPID(i)){ + mappingTable[tid] = newPack; + ProgramMappingEntry entry = mappingTable[tid].getEntry(0); + while (entry){ + unsigned long pid = entry.getElementaryPid(); + pidToCodec[pid] = entry.getStreamType(); + entry.advance(); + } + return; + } + } + //If it is not a PMT, check the list of all PMTs to see if this is a new PES track. + bool inPMT = false; + for (std::map::iterator it = mappingTable.begin(); it!= mappingTable.end(); it++){ + ProgramMappingEntry entry = it->second.getEntry(0); + while (entry){ + if (tid == entry.getElementaryPid()){ + inPMT = true; + break; + } + entry.advance(); + } + if (inPMT){ + break; + } + } + if (!inPMT){ + HIGH_MSG("Encountered a packet on track %d, but the track is not registered in any PMT", tid); + return; + } + pesStreams[tid].push_back(newPack); + pesPositions[tid].push_back(bytePos); + if (!newPack.getUnitStart() || pesStreams[tid].size() == 1){ + payloadSize[tid] += newPack.getPayloadLength(); + } + parsePES(tid); + } + + bool Stream::hasPacketOnEachTrack() const { + if (!pidToCodec.size()){ + return false; + } + for (std::map::const_iterator it = pidToCodec.begin(); it != pidToCodec.end(); it++){ + if (!outPackets.count(it->first) || !outPackets.at(it->first).size()){ + return false; + } + } + return true; + } + + bool Stream::hasPacket(unsigned long tid) const { + if (!pesStreams.count(tid)){ + return false; + } + if (outPackets.count(tid) && outPackets.at(tid).size()){ + return true; + } + for (int i = 1; i < pesStreams.find(tid)->second.size(); i++) { + if (pesStreams.find(tid)->second.at(i).getUnitStart()) { + return true; + } + } + return false; + } + + unsigned long long decodePTS(const char * data){ + unsigned long long time; + time = ((data[0] >> 1) & 0x07); + time <<= 15; + time |= ((int)data[1] << 7) | ((data[2] >> 1) & 0x7F); + time <<= 15; + time |= ((int)data[3] << 7) | ((data[4] >> 1) & 0x7F); + time /= 90; + return time; + } + + void Stream::parsePES(unsigned long tid){ + std::deque & inStream = pesStreams[tid]; + if (inStream.size() == 1){ + return; + } + if (!inStream.back().getUnitStart()){ + return; + } + + unsigned long long bPos = pesPositions[tid].front(); + //Create a buffer for the current PES, and remove it from the pesStreams buffer. + int paySize = payloadSize[tid]; + char * payload = (char*)malloc(paySize); + int offset = 0; + while (inStream.size() != 1){ + memcpy(payload + offset, inStream.front().getPayload(), inStream.front().getPayloadLength()); + offset += inStream.front().getPayloadLength(); + inStream.pop_front(); + pesPositions[tid].pop_front(); + } + + //Parse the PES header + offset = 0; + + while(offset < paySize){ + const char * pesHeader = payload + offset; + + //Check for large enough buffer + if ((paySize - offset) < 9 || (paySize - offset) < 9 + pesHeader[8]){ + INFO_MSG("Not enough data on track %lu, discarding remainder of data", tid); + break; + } + + //Check for valid PES lead-in + if(pesHeader[0] != 0 || pesHeader[1] != 0x00 || pesHeader[2] != 0x01){ + INFO_MSG("Invalid PES Lead in on track %lu, discarding it", tid); + break; + } + + //Read the payload size. + //Note: if the payload size is 0, then we assume the pes packet will cover the entire TS Unit. + //Note: this is technically only allowed for video pes streams. + unsigned long long realPayloadSize = (((int)pesHeader[4] << 8) | pesHeader[5]); + if (!realPayloadSize){ + realPayloadSize = paySize; + } + if (pidToCodec[tid] == AAC){ + realPayloadSize -= (3 + pesHeader[8]); + }else{ + realPayloadSize -= (9 + pesHeader[8]); + } + + //Read the metadata for this PES Packet + ///\todo Determine keyframe-ness + unsigned int timeStamp = 0; + unsigned int timeOffset = 0; + unsigned int pesOffset = 9; + if ((pesHeader[7] >> 6) & 0x02){//Check for PTS presence + timeStamp = decodePTS(pesHeader + pesOffset); + pesOffset += 5; + if (((pesHeader[7] & 0xC0) >> 6) & 0x01){//Check for DTS presence (yes, only if PTS present) + timeOffset = timeStamp; + timeStamp = decodePTS(pesHeader + pesOffset); + pesOffset += 5; + timeOffset -= timeStamp; + } + } + + if (paySize - offset - pesOffset < realPayloadSize){ + INFO_MSG("Not enough data left on track %lu.", tid); + break; + } + + char * pesPayload = payload + offset + pesOffset; + + //Create a new (empty) DTSC Packet at the end of the buffer + if (pidToCodec[tid] == AAC){ + //Parse all the ADTS packets + unsigned long offsetInPes = 0; + unsigned long samplesRead = 0; + while (offsetInPes < realPayloadSize){ + outPackets[tid].push_back(DTSC::Packet()); + aac::adts adtsPack(pesPayload + offsetInPes, realPayloadSize - offsetInPes); + if (!adtsInfo.count(tid)){ + adtsInfo[tid] = adtsPack; + } + outPackets[tid].back().genericFill(timeStamp + ((samplesRead * 1000) / adtsPack.getFrequency()), timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(), bPos, 0); + samplesRead += adtsPack.getSampleCount(); + offsetInPes += adtsPack.getHeaderSize() + adtsPack.getPayloadSize(); + } + } + if (pidToCodec[tid] == H264){ + //Convert from annex b + char * parsedData = NULL; + bool isKeyFrame = false; + unsigned long parsedSize = h264::fromAnnexB(pesPayload, realPayloadSize, parsedData); + std::deque nalInfo = h264::analyseH264Packet(parsedData, parsedSize); + int dataOffset = 0; + for (std::deque::iterator it = nalInfo.begin(); it != nalInfo.end(); it++){ + switch (it->nalType){ + case 0x05: { + isKeyFrame = true; + break; + } + case 0x07: { + spsInfo[tid] = std::string(parsedData + dataOffset + 4, it->nalSize); + break; + } + case 0x08: { + ppsInfo[tid] = std::string(parsedData + dataOffset + 4, it->nalSize); + break; + } + default: break; + } + dataOffset += 4 + it->nalSize; + } + outPackets[tid].push_back(DTSC::Packet()); + outPackets[tid].back().genericFill(timeStamp, timeOffset, tid, parsedData, parsedSize, bPos, isKeyFrame); + free(parsedData); + } + //We are done with the realpayload size, reverse calculation so we know the correct offset increase. + if (pidToCodec[tid] == AAC){ + realPayloadSize += (3 + pesHeader[8]); + }else{ + realPayloadSize += (9 + pesHeader[8]); + } + offset += realPayloadSize; + } + free(payload); + payloadSize[tid] = inStream.front().getPayloadLength(); + } + + void Stream::getPacket(unsigned long tid, DTSC::Packet & pack) { + pack.null(); + if (!hasPacket(tid)){ + ERROR_MSG("Trying to obtain a packet on track %lu, but no full packet is available", tid); + return; + } + + //Handle the situation where we have DTSC Packets buffered + if (outPackets[tid].size()){ + pack = outPackets[tid].front(); + outPackets[tid].pop_front(); + if (!outPackets[tid].size()){ + payloadSize[tid] = 0; + for (std::deque::iterator it = pesStreams[tid].begin(); it != pesStreams[tid].end(); it++){ + //Break this loop on the second TS Packet with the UnitStart flag set, not on the first. + if (it->getUnitStart() && it != pesStreams[tid].begin()){ + break; + } + payloadSize[tid] += it->getPayloadLength(); + } + } + return; + } + } + + void Stream::getEarliestPacket(DTSC::Packet & pack){ + pack.null(); + if (!hasPacketOnEachTrack()){ + return; + } + + unsigned long packTime = 0xFFFFFFFFull; + unsigned long packTrack = 0; + + for (std::map >::iterator it = outPackets.begin(); it != outPackets.end(); it++){ + if (it->second.front().getTime() < packTime){ + packTrack = it->first; + packTime = it->second.front().getTime(); + } + } + pack = outPackets[packTrack].front(); + outPackets[packTrack].pop_front(); + } + + void Stream::initializeMetadata(DTSC::Meta & meta) { + for (std::map::const_iterator it = pidToCodec.begin(); it != pidToCodec.end(); it++){ + if (!meta.tracks.count(it->first) && it->second == H264){ + if (!spsInfo.count(it->first) || !ppsInfo.count(it->first)){ + continue; + } + meta.tracks[it->first].type = "video"; + meta.tracks[it->first].codec = "H264"; + meta.tracks[it->first].trackID = it->first; + std::string tmpBuffer = spsInfo[it->first]; + h264::sequenceParameterSet sps(spsInfo[it->first].data(), spsInfo[it->first].size()); + h264::SPSMeta spsChar = sps.getCharacteristics(); + meta.tracks[it->first].width = spsChar.width; + meta.tracks[it->first].height = spsChar.height; + meta.tracks[it->first].fpks = spsChar.fps * 1000; + MP4::AVCC avccBox; + avccBox.setVersion(1); + avccBox.setProfile(spsInfo[it->first][1]); + avccBox.setCompatibleProfiles(spsInfo[it->first][2]); + avccBox.setLevel(spsInfo[it->first][3]); + avccBox.setSPSNumber(1); + avccBox.setSPS(spsInfo[it->first]); + avccBox.setPPSNumber(1); + avccBox.setPPS(ppsInfo[it->first]); + meta.tracks[it->first].init = std::string(avccBox.payload(), avccBox.payloadSize()); + INFO_MSG("Initialized metadata for track %lu, with an SPS of %lu bytes, and a PPS of %lu bytes", it->first, spsInfo[it->first].size(), ppsInfo[it->first].size()); + } + if (!meta.tracks.count(it->first) && it->second == AAC){ + meta.tracks[it->first].type = "audio"; + meta.tracks[it->first].codec = "AAC"; + meta.tracks[it->first].trackID = it->first; + meta.tracks[it->first].size = 16; + meta.tracks[it->first].rate = adtsInfo[it->first].getFrequency(); + meta.tracks[it->first].channels = adtsInfo[it->first].getChannelCount(); + char audioInit[2];//5 bits object type, 4 bits frequency index, 4 bits channel index + audioInit[0] = ((adtsInfo[it->first].getAACProfile() & 0x1F) << 3) | ((adtsInfo[it->first].getFrequencyIndex() & 0x0E) >> 1); + audioInit[1] = ((adtsInfo[it->first].getFrequencyIndex() & 0x01) << 7) | ((adtsInfo[it->first].getChannelConfig() & 0x0F) << 3); + meta.tracks[it->first].init = std::string(audioInit, 2); + } + } + } +} diff --git a/lib/ts_stream.h b/lib/ts_stream.h new file mode 100644 index 00000000..3c6c6341 --- /dev/null +++ b/lib/ts_stream.h @@ -0,0 +1,37 @@ +#include "ts_packet.h" +#include "adts.h" +#include +#include + +namespace TS { + enum codecType { + H264 = 0x1B, + AAC = 0x0F, + AC3 = 0x81 + }; + + class Stream{ + public: + void parse(Packet & newPack, unsigned long long bytePos); + void parse(char * newPack, unsigned long long bytePos); + bool hasPacketOnEachTrack() const; + bool hasPacket(unsigned long tid) const; + void getPacket(unsigned long tid, DTSC::Packet & pack); + void getEarliestPacket(DTSC::Packet & pack); + void initializeMetadata(DTSC::Meta & meta); + void clear(); + private: + ProgramAssociationTable associationTable; + std::map mappingTable; + std::map > pesStreams; + std::map > pesPositions; + std::map payloadSize; + std::map > outPackets; + std::map pidToCodec; + std::map adtsInfo; + std::map spsInfo; + std::map ppsInfo; + + void parsePES(unsigned long tid); + }; +} diff --git a/src/analysers/tsstream_analyser.cpp b/src/analysers/tsstream_analyser.cpp new file mode 100755 index 00000000..2fc05f72 --- /dev/null +++ b/src/analysers/tsstream_analyser.cpp @@ -0,0 +1,54 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace Analysers { + /// Debugging tool for TS data. + /// Expects TS data through stdin, outputs human-readable information to stderr. + /// \return The return code of the analyser. + int analyseTS(bool validate, bool analyse, int detailLevel){ + TS::Stream tsStream; + std::map payloads; + TS::Packet packet; + long long int upTime = Util::bootSecs(); + int64_t pcr = 0; + unsigned int bytes = 0; + char packetPtr[188]; + while (std::cin.good()){ + std::cin.read(packetPtr,188); + if(std::cin.gcount() != 188){break;} + bytes += 188; + if(packet.FromPointer(packetPtr)){ + tsStream.parse(packet, bytes); + if (tsStream.hasPacketOnEachTrack()){ + DTSC::Packet dtscPack; + tsStream.getEarliestPacket(dtscPack); + std::cout << dtscPack.toJSON().toPrettyString(); + } + } + } + return 0; + } +} + +int main(int argc, char ** argv){ + Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION); + conf.addOption("analyse", JSON::fromString("{\"long\":\"analyse\", \"short\":\"a\", \"default\":1, \"long_off\":\"notanalyse\", \"short_off\":\"b\", \"help\":\"Analyse a file's contents (-a), or don't (-b) returning false on error. Default is analyse.\"}")); + conf.addOption("validate", JSON::fromString("{\"long\":\"validate\", \"short\":\"V\", \"default\":0, \"long_off\":\"notvalidate\", \"short_off\":\"X\", \"help\":\"Validate (-V) the file contents or don't validate (-X) its integrity, returning false on error. Default is don't validate.\"}")); + conf.addOption("detail", JSON::fromString("{\"long\":\"detail\", \"short\":\"D\", \"arg\":\"num\", \"default\":3, \"help\":\"Detail level of analysis.\"}")); + conf.parseArgs(argc, argv); + return Analysers::analyseTS(conf.getBool("validate"),conf.getBool("analyse"),conf.getInteger("detail")); +} diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 7d608062..f5ee109e 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -244,16 +244,16 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ //if object, delete all entries //if string, delete just the one if (Request["deletestream"].isString()){ - Controller::Storage["streams"].removeMember(Request["deletestream"].asStringRef()); + Controller::deleteStream(Request["deletestream"].asStringRef(), Controller::Storage["streams"]); } if (Request["deletestream"].isArray()){ for (JSON::ArrIter it = Request["deletestream"].ArrBegin(); it != Request["deletestream"].ArrEnd(); ++it){ - Controller::Storage["streams"].removeMember(it->asString()); + Controller::deleteStream(it->asStringRef(), Controller::Storage["streams"]); } } if (Request["deletestream"].isObject()){ for (JSON::ObjIter it = Request["deletestream"].ObjBegin(); it != Request["deletestream"].ObjEnd(); ++it){ - Controller::Storage["streams"].removeMember(it->first); + Controller::deleteStream(it->first, Controller::Storage["streams"]); } } } diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index 7744106b..319a905b 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -15,6 +15,7 @@ ///\brief Holds everything unique to the controller. namespace Controller { + std::map inputProcesses; ///\brief Checks whether two streams are equal. ///\param one The first stream for the comparison. @@ -58,6 +59,31 @@ namespace Controller { } if (URL.substr(0, 1) != "/"){ //push-style stream + if (data["udpport"].asInt()){ + std::string udpPort = data["udpport"].asString(); + //Check running + if (!inputProcesses.count(name) || !Util::Procs::isRunning(inputProcesses[name])){ + // False: start TS input + INFO_MSG("No TS Input running on port %s for stream %s, starting it", udpPort.c_str(), name.c_str()); + std::deque command; + command.push_back(Util::getMyPath() + "MistInTSStream"); + command.push_back("-s"); + command.push_back(name); + command.push_back("-p"); + command.push_back(udpPort); + command.push_back(URL); + int stdIn = 0; + int stdOut = 1; + int stdErr = 2; + pid_t program = Util::Procs::StartPiped(command, &stdIn, &stdOut, &stdErr); + if (program){ + inputProcesses[name] = program; + } + } + //Check hasViewers + // True: data["online"] = 2; + // False: data["online"] =11; + } return; } if (URL.substr(0, 1) == "/"){ @@ -204,13 +230,12 @@ namespace Controller { for (JSON::ObjIter jit = out.ObjBegin(); jit != out.ObjEnd(); jit++){ if ( !in.isMember(jit->first)){ toDelete.insert(jit->first); - Log("STRM", std::string("Deleted stream ") + jit->first); } } //actually delete the streams while (toDelete.size() > 0){ std::string deleting = *(toDelete.begin()); - out.removeMember(deleting); + deleteStream(deleting, out); toDelete.erase(deleting); } @@ -229,4 +254,19 @@ namespace Controller { } + void deleteStream(const std::string & name, JSON::Value & out) { + if (!out.isMember(name)){ + return; + } + Log("STRM", std::string("Deleted stream ") + name); + out.removeMember(name); + if (inputProcesses.count(name)){ + pid_t procId = inputProcesses[name]; + if (Util::Procs::isRunning(procId)){ + Util::Procs::Stop(procId); + } + inputProcesses.erase(name); + } + } + } //Controller namespace diff --git a/src/controller/controller_streams.h b/src/controller/controller_streams.h index 3a1c9abc..7ef29c92 100644 --- a/src/controller/controller_streams.h +++ b/src/controller/controller_streams.h @@ -6,9 +6,11 @@ namespace Controller { bool CheckAllStreams(JSON::Value & data); void CheckStreams(JSON::Value & in, JSON::Value & out); void AddStreams(JSON::Value & in, JSON::Value & out); + void deleteStream(const std::string & name, JSON::Value & out); struct liveCheck { long long int lastms; long long int last_active; }; + } //Controller namespace diff --git a/src/input/input.cpp b/src/input/input.cpp index ef9e03a1..fcd17b9e 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include "input.h" #include @@ -10,25 +11,29 @@ namespace Mist { Input * Input::singleton = NULL; - - void Input::userCallback(char * data, size_t len, unsigned int id){ - for (int i = 0; i < 5; i++){ - unsigned long tid = ((unsigned long)(data[i*6]) << 24) | ((unsigned long)(data[i*6+1]) << 16) | ((unsigned long)(data[i*6+2]) << 8) | ((unsigned long)(data[i*6+3])); - if (tid){ - unsigned long keyNum = ((unsigned long)(data[i*6+4]) << 8) | ((unsigned long)(data[i*6+5])); + + void Input::userCallback(char * data, size_t len, unsigned int id) { + for (int i = 0; i < 5; i++) { + unsigned long tid = ((unsigned long)(data[i * 6]) << 24) | ((unsigned long)(data[i * 6 + 1]) << 16) | ((unsigned long)(data[i * 6 + 2]) << 8) | ((unsigned long)(data[i * 6 + 3])); + if (tid) { + unsigned long keyNum = ((unsigned long)(data[i * 6 + 4]) << 8) | ((unsigned long)(data[i * 6 + 5])); bufferFrame(tid, keyNum + 1);//Try buffer next frame } } } - - void Input::callbackWrapper(char * data, size_t len, unsigned int id){ + + void Input::callbackWrapper(char * data, size_t len, unsigned int id) { singleton->userCallback(data, 30, id);//call the userCallback for this input } - + Input::Input(Util::Config * cfg) : InOutBase() { config = cfg; +#ifdef INPUT_LIVE + standAlone = false; +#else standAlone = true; - +#endif + JSON::Value option; option["long"] = "json"; option["short"] = "j"; @@ -69,36 +74,35 @@ namespace Mist { option.null(); /*LTS-END*/ - capa["optional"]["debug"]["name"] = "debug"; capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed."; capa["optional"]["debug"]["option"] = "--debug"; capa["optional"]["debug"]["type"] = "debug"; - + packTime = 0; lastActive = Util::epoch(); playing = 0; playUntil = 0; - + singleton = this; isBuffer = false; } - void Input::checkHeaderTimes(std::string streamFile){ - if ( streamFile == "-" ){ + void Input::checkHeaderTimes(std::string streamFile) { + if (streamFile == "-") { return; } std::string headerFile = streamFile + ".dtsh"; - FILE * tmp = fopen(headerFile.c_str(),"r"); - if (tmp == NULL){ - DEBUG_MSG(DLVL_HIGH, "Can't open header: %s. Assuming all is fine.", headerFile.c_str() ); + FILE * tmp = fopen(headerFile.c_str(), "r"); + if (tmp == NULL) { + DEBUG_MSG(DLVL_HIGH, "Can't open header: %s. Assuming all is fine.", headerFile.c_str()); return; - } + } struct stat bufStream; struct stat bufHeader; //fstat(fileno(streamFile), &bufStream); //fstat(fileno(tmp), &bufHeader); - if (stat(streamFile.c_str(), &bufStream) !=0 || stat(headerFile.c_str(), &bufHeader) !=0){ + if (stat(streamFile.c_str(), &bufStream) != 0 || stat(headerFile.c_str(), &bufHeader) != 0) { DEBUG_MSG(DLVL_HIGH, "Could not compare stream and header timestamps - assuming all is fine."); fclose(tmp); return; @@ -106,36 +110,47 @@ namespace Mist { int timeStream = bufStream.st_mtime; int timeHeader = bufHeader.st_mtime; - fclose(tmp); - if (timeHeader < timeStream){ + fclose(tmp); + if (timeHeader < timeStream) { //delete filename - INFO_MSG("Overwriting outdated DTSH header file: %s ",headerFile.c_str()); + INFO_MSG("Overwriting outdated DTSH header file: %s ", headerFile.c_str()); remove(headerFile.c_str()); } } int Input::run() { + if (streamName != "") { + config->getOption("streamname") = streamName; + } streamName = config->getString("streamname"); if (config->getBool("json")) { std::cout << capa.toString() << std::endl; return 0; } - if (!setup()){ + if (!setup()) { std::cerr << config->getString("cmd") << " setup failed." << std::endl; return 0; } +//Do not read the header if this is a live stream +#ifndef INPUT_LIVE checkHeaderTimes(config->getString("input")); - if (!readHeader()){ + if (!readHeader()) { std::cerr << "Reading header for " << config->getString("input") << " failed." << std::endl; return 0; } parseHeader(); - +#endif + +//Live inputs only have a serve() mode +#ifndef INPUT_LIVE if (!config->getString("streamname").size()){ convert(); }else{ +#endif serve(); +#ifndef INPUT_LIVE } +#endif return 0; } @@ -173,6 +188,35 @@ namespace Mist { void Input::serve(){ char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); +#ifdef INPUT_LIVE + Util::startInput(streamName); + userClient = IPC::sharedClient(userPageName, 30, true); + getNext(); + while (thisPacket || config->is_active){ + unsigned long tid = thisPacket.getTrackId(); + //Check for eligibility of track + IPC::userConnection userConn(userClient.getData()); + if (trackOffset.count(tid) && !userConn.getTrackId(trackOffset[tid])){ + trackOffset.erase(tid); + trackState.erase(tid); + trackMap.erase(tid); + trackBuffer.erase(tid); + pagesByTrack.erase(tid); + metaPages.erase(tid); + curPageNum.erase(tid); + curPage.erase(tid); + INFO_MSG("Erasing track %d", tid); + continue; + } + if (thisPacket){ + continueNegotiate(thisPacket.getTrackId()); + bufferLivePacket(thisPacket); + } + getNext(); + userClient.keepAlive(); + } + userClient.finish(); +#else userPage.init(userPageName, PLAY_EX_SIZE, true); if (!isBuffer){ for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ @@ -185,8 +229,8 @@ namespace Mist { long long int activityCounter = Util::bootSecs(); while ((Util::bootSecs() - activityCounter) < 10 && config->is_active){//10 second timeout Util::wait(1000); - removeUnused(); userPage.parseEach(callbackWrapper); + removeUnused(); if (userPage.amount){ activityCounter = Util::bootSecs(); DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount); @@ -194,40 +238,41 @@ namespace Mist { DEBUG_MSG(DLVL_INSANE, "Timer running"); } } +#endif finish(); DEBUG_MSG(DLVL_DEVEL,"Input for stream %s closing clean", streamName.c_str()); //end player functionality } - void Input::finish(){ - for( std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){ - for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ + void Input::finish() { + for (std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) { + for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) { it2->second = 1; } } removeUnused(); - if (standAlone){ - for (std::map::iterator it = metaPages.begin(); it != metaPages.end(); it++){ + if (standAlone) { + for (std::map::iterator it = metaPages.begin(); it != metaPages.end(); it++) { it->second.master = true; } } } - void Input::removeUnused(){ - for (std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){ - for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ + void Input::removeUnused() { + for (std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) { + for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) { it2->second--; } bool change = true; - while (change){ + while (change) { change = false; - for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ - if (!it2->second){ + for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) { + if (!it2->second) { bufferRemove(it->first, it2->first); pageCounter[it->first].erase(it2->first); - for (int i = 0; i < 8192; i += 8){ + for (int i = 0; i < 8192; i += 8) { unsigned int thisKeyNum = ntohl(((((long long int *)(metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF); - if (thisKeyNum == it2->first){ + if (thisKeyNum == it2->first) { (((long long int *)(metaPages[it->first].mapped + i))[0]) = 0; } } @@ -238,106 +283,106 @@ namespace Mist { } } } - - void Input::parseHeader(){ - DEBUG_MSG(DLVL_DONTEVEN,"Parsing the header"); + + void Input::parseHeader() { + DEBUG_MSG(DLVL_DONTEVEN, "Parsing the header"); selectedTracks.clear(); std::stringstream trackSpec; for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { DEBUG_MSG(DLVL_VERYHIGH, "Track %u encountered", it->first); - if (trackSpec.str() != ""){ + if (trackSpec.str() != "") { trackSpec << " "; } trackSpec << it->first; DEBUG_MSG(DLVL_VERYHIGH, "Trackspec now %s", trackSpec.str().c_str()); - for (std::deque::iterator it2 = it->second.keys.begin(); it2 != it->second.keys.end(); it2++){ + for (std::deque::iterator it2 = it->second.keys.begin(); it2 != it->second.keys.end(); it2++) { keyTimes[it->first].insert(it2->getTime()); } } trackSelect(trackSpec.str()); - + bool hasKeySizes = true; - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (!it->second.keySizes.size()){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { + if (!it->second.keySizes.size()) { hasKeySizes = false; break; } } - if (hasKeySizes){ - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (hasKeySizes) { + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { bool newData = true; - for (int i = 0; i < it->second.keys.size(); i++){ - if (newData){ + for (int i = 0; i < it->second.keys.size(); i++) { + if (newData) { //i+1 because keys are 1-indexed - pagesByTrack[it->first][i+1].firstTime = it->second.keys[i].getTime(); + pagesByTrack[it->first][i + 1].firstTime = it->second.keys[i].getTime(); newData = false; } pagesByTrack[it->first].rbegin()->second.keyNum++; pagesByTrack[it->first].rbegin()->second.partNum += it->second.keys[i].getParts(); pagesByTrack[it->first].rbegin()->second.dataSize += it->second.keySizes[i]; - if (pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE){ + if (pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE) { newData = true; } } } - }else{ - std::map curData; - std::map bookKeeping; - - seek(0); - getNext(); + } else { + std::map curData; + std::map bookKeeping; - while(thisPacket){//loop through all - unsigned int tid = thisPacket.getTrackId(); - if (!tid){ - getNext(false); - continue; - } - if (!bookKeeping.count(tid)){ - bookKeeping[tid].first = 1; - bookKeeping[tid].curPart = 0; - bookKeeping[tid].curKey = 0; - - curData[tid].lastKeyTime = 0xFFFFFFFF; - curData[tid].keyNum = 1; - curData[tid].partNum = 0; - curData[tid].dataSize = 0; - curData[tid].curOffset = 0; - curData[tid].firstTime = myMeta.tracks[tid].keys[0].getTime(); + seek(0); + getNext(); - } - if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum){ - if (curData[tid].dataSize > FLIP_DATA_PAGE_SIZE) { - pagesByTrack[tid][bookKeeping[tid].first] = curData[tid]; - bookKeeping[tid].first += curData[tid].keyNum; - curData[tid].keyNum = 0; - curData[tid].dataSize = 0; - curData[tid].firstTime = myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getTime(); + while (thisPacket) { //loop through all + unsigned int tid = thisPacket.getTrackId(); + if (!tid) { + getNext(false); + continue; + } + if (!bookKeeping.count(tid)) { + bookKeeping[tid].first = 1; + bookKeeping[tid].curPart = 0; + bookKeeping[tid].curKey = 0; + + curData[tid].lastKeyTime = 0xFFFFFFFF; + curData[tid].keyNum = 1; + curData[tid].partNum = 0; + curData[tid].dataSize = 0; + curData[tid].curOffset = 0; + curData[tid].firstTime = myMeta.tracks[tid].keys[0].getTime(); + + } + if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum) { + if (curData[tid].dataSize > FLIP_DATA_PAGE_SIZE) { + pagesByTrack[tid][bookKeeping[tid].first] = curData[tid]; + bookKeeping[tid].first += curData[tid].keyNum; + curData[tid].keyNum = 0; + curData[tid].dataSize = 0; + curData[tid].firstTime = myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getTime(); + } + bookKeeping[tid].curKey++; + curData[tid].keyNum++; + curData[tid].partNum = 0; + } + curData[tid].dataSize += thisPacket.getDataLen(); + curData[tid].partNum ++; + bookKeeping[tid].curPart ++; + DEBUG_MSG(DLVL_DONTEVEN, "Track %ld:%llu on page %d@%llu (len:%d), being part %lu of key %lu", thisPacket.getTrackId(), thisPacket.getTime(), bookKeeping[tid].first, curData[tid].dataSize, thisPacket.getDataLen(), curData[tid].partNum, bookKeeping[tid].first + curData[tid].keyNum); + getNext(false); + } + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { + if (curData.count(it->first) && !pagesByTrack[it->first].count(bookKeeping[it->first].first)) { + pagesByTrack[it->first][bookKeeping[it->first].first] = curData[it->first]; } - bookKeeping[tid].curKey++; - curData[tid].keyNum++; - curData[tid].partNum = 0; } - curData[tid].dataSize += thisPacket.getDataLen(); - curData[tid].partNum ++; - bookKeeping[tid].curPart ++; - DEBUG_MSG(DLVL_DONTEVEN, "Track %ld:%llu on page %d@%llu (len:%d), being part %lu of key %lu", thisPacket.getTrackId(), thisPacket.getTime(), bookKeeping[tid].first, curData[tid].dataSize, thisPacket.getDataLen(), curData[tid].partNum, bookKeeping[tid].first+curData[tid].keyNum); - getNext(false); } for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { - if (curData.count(it->first) && !pagesByTrack[it->first].count(bookKeeping[it->first].first)){ - pagesByTrack[it->first][bookKeeping[it->first].first] = curData[it->first]; - } - } - } - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (!pagesByTrack.count(it->first)){ - DEBUG_MSG(DLVL_WARN, "No pages for track %d found", it->first); - }else{ - DEBUG_MSG(DLVL_MEDIUM, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), pagesByTrack[it->first].size()); - for (std::map::iterator it2 = pagesByTrack[it->first].begin(); it2 != pagesByTrack[it->first].end(); it2++){ - DEBUG_MSG(DLVL_VERYHIGH, "Page %lu-%lu, (%llu bytes)", it2->first, it2->first + it2->second.keyNum - 1, it2->second.dataSize); - } + if (!pagesByTrack.count(it->first)) { + DEBUG_MSG(DLVL_WARN, "No pages for track %d found", it->first); + } else { + DEBUG_MSG(DLVL_MEDIUM, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), pagesByTrack[it->first].size()); + for (std::map::iterator it2 = pagesByTrack[it->first].begin(); it2 != pagesByTrack[it->first].end(); it2++) { + DEBUG_MSG(DLVL_VERYHIGH, "Page %lu-%lu, (%llu bytes)", it2->first, it2->first + it2->second.keyNum - 1, it2->second.dataSize); + } } } } @@ -345,15 +390,24 @@ namespace Mist { bool Input::bufferFrame(unsigned int track, unsigned int keyNum){ VERYHIGH_MSG("bufferFrame for stream %s, track %u, key %u", streamName.c_str(), track, keyNum); - if (keyNum > myMeta.tracks[track].keys.size()){ + if (keyNum >= myMeta.tracks[track].keys.size()){ //End of movie here, returning true to avoid various error messages VERYHIGH_MSG("Key number is higher than total key count. Cancelling bufferFrame"); return true; } - if (keyNum < 1){keyNum = 1;} - //abort in case already buffered - int pageNumber = bufferedOnPage(track, keyNum); - if (pageNumber){ + if (keyNum < 1) { + keyNum = 1; + } + if (isBuffered(track, keyNum)) { + //get corresponding page number + int pageNumber = 0; + for (std::map::iterator it = pagesByTrack[track].begin(); it != pagesByTrack[track].end(); it++) { + if (it->first <= keyNum) { + pageNumber = it->first; + } else { + break; + } + } pageCounter[track][pageNumber] = 15; VERYHIGH_MSG("Track %u, key %u is already buffered in page %d. Cancelling bufferFrame", track, keyNum, pageNumber); return true; @@ -366,7 +420,7 @@ namespace Mist { INFO_MSG("Loading key %u from page %lu", keyNum, (--(pagesByTrack[track].upper_bound(keyNum)))->first); keyNum = (--(pagesByTrack[track].upper_bound(keyNum)))->first; if (!bufferStart(track, keyNum)){ - WARN_MSG("bufferStart failed! Cancelling bufferFrame", track); + WARN_MSG("bufferStart failed! Cancelling bufferFrame"); return false; } @@ -375,16 +429,16 @@ namespace Mist { trackSelect(trackSpec.str()); seek(myMeta.tracks[track].keys[keyNum - 1].getTime()); long long unsigned int stopTime = myMeta.tracks[track].lastms + 1; - if ((int)myMeta.tracks[track].keys.size() > keyNum - 1 + pagesByTrack[track][keyNum].keyNum){ + if ((int)myMeta.tracks[track].keys.size() > keyNum - 1 + pagesByTrack[track][keyNum].keyNum) { stopTime = myMeta.tracks[track].keys[keyNum - 1 + pagesByTrack[track][keyNum].keyNum].getTime(); } DEBUG_MSG(DLVL_HIGH, "Playing from %llu to %llu", myMeta.tracks[track].keys[keyNum - 1].getTime(), stopTime); getNext(); //in case earlier seeking was inprecise, seek to the exact point - while (thisPacket && thisPacket.getTime() < (unsigned long long)myMeta.tracks[track].keys[keyNum - 1].getTime()){ + while (thisPacket && thisPacket.getTime() < (unsigned long long)myMeta.tracks[track].keys[keyNum - 1].getTime()) { getNext(); } - while (thisPacket && thisPacket.getTime() < stopTime){ + while (thisPacket && thisPacket.getTime() < stopTime) { bufferNext(thisPacket); getNext(); } @@ -393,39 +447,39 @@ namespace Mist { pageCounter[track][keyNum] = 15; return true; } - - bool Input::atKeyFrame(){ + + bool Input::atKeyFrame() { static std::map lastSeen; //not in keyTimes? We're not at a keyframe. unsigned int c = keyTimes[thisPacket.getTrackId()].count(thisPacket.getTime()); - if (!c){ + if (!c) { return false; } //skip double times - if (lastSeen.count(thisPacket.getTrackId()) && lastSeen[thisPacket.getTrackId()] == thisPacket.getTime()){ + if (lastSeen.count(thisPacket.getTrackId()) && lastSeen[thisPacket.getTrackId()] == thisPacket.getTime()) { return false; } //set last seen, and return true lastSeen[thisPacket.getTrackId()] = thisPacket.getTime(); return true; } - - void Input::play(int until){ + + void Input::play(int until) { playing = -1; playUntil = until; initialTime = 0; benchMark = Util::getMS(); } - void Input::playOnce(){ - if (playing <= 0){ + void Input::playOnce() { + if (playing <= 0) { playing = 1; } ++playing; benchMark = Util::getMS(); } - void Input::quitPlay(){ + void Input::quitPlay() { playing = 0; } } diff --git a/src/input/input.h b/src/input/input.h index 725051fe..e49ac0cd 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -20,6 +20,7 @@ namespace Mist { public: Input(Util::Config * cfg); virtual int run(); + virtual void argumentsParsed(){} virtual ~Input() {}; protected: static void callbackWrapper(char * data, size_t len, unsigned int id); @@ -36,11 +37,10 @@ namespace Mist { virtual void removeUnused(); virtual void trackSelect(std::string trackSpec){}; virtual void userCallback(char * data, size_t len, unsigned int id); - - void serve(); - void convert(); + virtual void convert(); + virtual void serve(); - void parseHeader(); + virtual void parseHeader(); bool bufferFrame(unsigned int track, unsigned int keyNum); unsigned int packTime;///Media-timestamp of the last packet. diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 59e6a553..00f1a3a6 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -13,7 +13,7 @@ #include "input_buffer.h" #ifndef TIMEOUTMULTIPLIER -#define TIMEOUTMULTIPLIER 10 +#define TIMEOUTMULTIPLIER 2 #endif namespace Mist { @@ -71,6 +71,17 @@ namespace Mist { capa["optional"]["segmentsize"]["type"] = "uint"; capa["optional"]["segmentsize"]["default"] = 5000LL; option.null(); + option["arg"] = "integer"; + option["long"] = "udp-port"; + option["short"] = "U"; + option["help"] = "The UDP port on which to listen for TS Packets"; + option["value"].append(0LL); + config->addOption("udpport", option); + capa["optional"]["udpport"]["name"] = "TS/UDP port"; + capa["optional"]["udpport"]["help"] = "The UDP port on which to listen for TS Packets, or 0 for disabling TS Input"; + capa["optional"]["udpport"]["option"] = "--udp-port"; + capa["optional"]["udpport"]["type"] = "uint"; + capa["optional"]["udpport"]["default"] = 0LL; /*LTS-end*/ capa["source_match"] = "push://*"; capa["priority"] = 9ll; @@ -334,12 +345,21 @@ namespace Mist { curPage.erase(tid); bufferLocations[tid].erase(bufferLocations[tid].begin()); } + //Reset the userpage, to allow repushing from TS + IPC::userConnection userConn(pushLocation[it->first]); + for (int i = 0; i < SIMUL_TRACKS; i++){ + if (userConn.getTrackId(i) == it->first) { + userConn.setTrackId(i, 0); + userConn.setKeynum(i, 0); + break; + } + } curPageNum.erase(it->first); metaPages[it->first].master = true; metaPages.erase(it->first); activeTracks.erase(it->first); pushLocation.erase(it->first); - myMeta.tracks.erase(it); + myMeta.tracks.erase(it->first); changed = true; break; } @@ -389,10 +409,10 @@ namespace Mist { //Get the counter of this user char counter = (*(data - 1)); //Each user can have at maximum SIMUL_TRACKS elements in their userpage. + IPC::userConnection userConn(data); for (int index = 0; index < SIMUL_TRACKS; index++){ - char * thisData = data + (index * 6); //Get the track id from the current element - unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3]; + unsigned long value = userConn.getTrackId(index); //Skip value 0xFFFFFFFF as this indicates a previously declined track if (value == 0xFFFFFFFF){ continue; @@ -429,15 +449,11 @@ namespace Mist { //Add the temporary track id to the list of tracks that are currently being negotiated negotiatingTracks.insert(tempMapping); //Write the temporary id to the userpage element - thisData[0] = (tempMapping >> 24) & 0xFF; - thisData[1] = (tempMapping >> 16) & 0xFF; - thisData[2] = (tempMapping >> 8) & 0xFF; - thisData[3] = (tempMapping) & 0xFF; + userConn.setTrackId(index, tempMapping); //Obtain the original track number for the pushing process - unsigned long originalTrack = ((long)(thisData[4]) << 8) | thisData[5]; + unsigned long originalTrack = userConn.getKeynum(index); //Overwrite it with 0xFFFF - thisData[4] = 0xFF; - thisData[5] = 0xFF; + userConn.setKeynum(index, 0xFFFF); DEBUG_MSG(DLVL_HIGH, "Incoming track %lu from pushing process %d has now been assigned temporary id %llu", originalTrack, id, tempMapping); } @@ -498,7 +514,6 @@ namespace Mist { } } /*LTS-END*/ - //Remove the "negotiate" status in either case negotiatingTracks.erase(value); //Set master to true before erasing the page, because we are responsible for cleaning up unused pages @@ -547,25 +562,20 @@ namespace Mist { pushLocation[finalMap] = data; //Initialize the metadata for this track if it was not in place yet. if (!myMeta.tracks.count(finalMap)){ - DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap); + DEBUG_MSG(DLVL_MEDIUM, "Inserting metadata for track number %d", finalMap); myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second; myMeta.tracks[finalMap].trackID = finalMap; } - //Write the final mapped track number to the user page element - thisData[0] = (finalMap >> 24) & 0xFF; - thisData[1] = (finalMap >> 16) & 0xFF; - thisData[2] = (finalMap >> 8) & 0xFF; - thisData[3] = (finalMap) & 0xFF; - //Write the key number to start pushing from to to the userpage element. + //Write the final mapped track number and keyframe number to the user page element //This is used to resume pushing as well as pushing new tracks - unsigned long keyNum = myMeta.tracks[finalMap].keys.size(); - thisData[4] = (keyNum >> 8) & 0xFF; - thisData[5] = keyNum & 0xFF; + userConn.setTrackId(index, finalMap); + userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size()); //Update the metadata to reflect all changes updateMeta(); } //If the track is active, and this is the element responsible for pushing it if (activeTracks.count(value) && pushLocation[value] == data){ + INFO_MSG("Track is live and pushin'"); //Open the track index page if we dont have it open yet if (!metaPages.count(value) || !metaPages[value].mapped){ char firstPage[NAME_BUFFER_SIZE]; @@ -581,6 +591,7 @@ namespace Mist { } void inputBuffer::updateTrackMeta(unsigned long tNum){ + INFO_MSG("Updating meta for track %d", tNum); //Store a reference for easier access std::map & locations = bufferLocations[tNum]; @@ -591,6 +602,7 @@ namespace Mist { continue; } unsigned long keyNum = ntohl(tmpOffset[0]); + INFO_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1])); //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. if (!locations.count(keyNum)){ @@ -599,7 +611,6 @@ namespace Mist { locations[keyNum].pageNum = keyNum; locations[keyNum].keyNum = ntohl(tmpOffset[1]); } - //Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest for (std::map::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++){ updateMetaFromPage(tNum, pageIt->first); @@ -608,6 +619,7 @@ namespace Mist { } void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum){ + INFO_MSG("Updating meta for track %d page %d", tNum, pageNum); DTSCPageData & pageData = bufferLocations[tNum][pageNum]; //If the current page is over its 8mb "splitting" boundary @@ -615,6 +627,7 @@ namespace Mist { //And the last keyframe in the parsed metadata is further in the stream than this page if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()){ //Assume the entire page is already parsed + INFO_MSG("Assuming its already done", tNum, pageNum); return; } } diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index d2ddf6c2..864c1f66 100755 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -26,22 +26,36 @@ namespace Mist { capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][1u].append("AAC"); capa["codecs"][0u][1u].append("AC3"); + + capa["optional"]["port"]["name"] = "UDP Port"; + capa["optional"]["port"]["help"] = "The udp port on which to listen for incoming UDP Packets"; + capa["optional"]["port"]["type"] = "uint"; + capa["optional"]["port"]["default"] = 9876; + capa["optional"]["port"]["option"] = "--port"; + cfg->addOption("port", + JSON::fromString("{\"arg\":\"integer\",\"value\":9876,\"short\":\"p\",\"long\":\"port\",\"help\":\"The udp port on which to listen for incoming UDP Packets.\"}")); + + pushing = false; } ///Setup of TS Input bool inputTS::setup() { +#ifdef INPUT_LIVE if (config->getString("input") == "-") { inFile = stdin; }else{ - inFile = fopen(config->getString("input").c_str(), "r"); + pushing = true; + udpCon.setBlocking(false); + udpCon.bind(config->getInteger("port")); } - - if (config->getString("output") != "-") { - std::cerr << "Output to non-stdout not yet supported" << std::endl; +#else + if (config->getString("input") != "-"){ + inFile = fopen(config->getString("input").c_str(), "r"); } if (!inFile) { return false; } +#endif return true; } @@ -62,154 +76,6 @@ namespace Mist { } } - void inputTS::parsePESHeader(int tid, pesBuffer & buf){ - if (buf.data.size() < 9){ - return; - } - if (buf.data.size() < 9 + buf.data[8]){ - return; - } - if( (((int)buf.data[0] << 16) | ((int)buf.data[1] << 8) | buf.data[2]) != 0x000001){ - DEBUG_MSG(DLVL_WARN, "Parsing PES for track %d failed due to incorrect header (%0.6X), throwing away", tid, (((int)buf.data[0] << 16) | ((int)buf.data[1] << 8) | buf.data[2]) ); - buf.data = ""; - return; - } - buf.len = (((int)buf.data[4] << 8) | buf.data[5]) - (3 + buf.data[8]); - if ((buf.data[7] >> 6) & 0x02){//Check for PTS presence - buf.time = ((buf.data[9] >> 1) & 0x07); - buf.time <<= 15; - buf.time |= ((int)buf.data[10] << 7) | ((buf.data[11] >> 1) & 0x7F); - buf.time <<= 15; - buf.time |= ((int)buf.data[12] << 7) | ((buf.data[13] >> 1) & 0x7F); - buf.time /= 90; - if (((buf.data[7] & 0xC0) >> 6) & 0x01){//Check for DTS presence (yes, only if PTS present) - buf.offset = buf.time; - buf.time = ((buf.data[14] >> 1) & 0x07); - buf.time <<= 15; - buf.time |= ((int)buf.data[15] << 7) | ((buf.data[16] >> 1) & 0x7F); - buf.time <<= 15; - buf.time |= ((int)buf.data[17] << 7) | ((buf.data[18] >> 1) & 0x7F); - buf.time /= 90; - buf.offset -= buf.time; - } - } - if (!firstTimes.count(tid)){ - firstTimes[tid] = buf.time; - } - buf.time -= firstTimes[tid]; - buf.data.erase(0, 9 + buf.data[8]); - } - - void inputTS::parsePESPayload(int tid, pesBuffer & buf){ - if (myMeta.tracks[tid].codec == "H264"){ - parseH264PES(tid, buf); - } - if (myMeta.tracks[tid].codec == "AAC"){ - parseAACPES(tid, buf); - } - } - - - void inputTS::parseAACPES(int tid, pesBuffer & buf){ - if (!buf.data.size()){ - buf.len = 0; - return; - } - if (myMeta.tracks[tid].init == ""){ - char audioInit[2];//5 bits object type, 4 bits frequency index, 4 bits channel index - char AACProfile = ((buf.data[2] >> 6) & 0x03) + 1; - char frequencyIndex = ((buf.data[2] >> 2) & 0x0F); - char channelConfig = ((buf.data[2] & 0x01) << 2) | ((buf.data[3] >> 6) & 0x03); - switch(frequencyIndex){ - case 0: myMeta.tracks[tid].rate = 96000; break; - case 1: myMeta.tracks[tid].rate = 88200; break; - case 2: myMeta.tracks[tid].rate = 64000; break; - case 3: myMeta.tracks[tid].rate = 48000; break; - case 4: myMeta.tracks[tid].rate = 44100; break; - case 5: myMeta.tracks[tid].rate = 32000; break; - case 6: myMeta.tracks[tid].rate = 24000; break; - case 7: myMeta.tracks[tid].rate = 22050; break; - case 8: myMeta.tracks[tid].rate = 16000; break; - case 9: myMeta.tracks[tid].rate = 12000; break; - case 10: myMeta.tracks[tid].rate = 11025; break; - case 11: myMeta.tracks[tid].rate = 8000; break; - case 12: myMeta.tracks[tid].rate = 7350; break; - default: myMeta.tracks[tid].rate = 0; break; - } - myMeta.tracks[tid].channels = channelConfig; - if (channelConfig == 7){ - myMeta.tracks[tid].channels = 8; - } - audioInit[0] = ((AACProfile & 0x1F) << 3) | ((frequencyIndex & 0x0E) >> 1); - audioInit[1] = ((frequencyIndex & 0x01) << 7) | ((channelConfig & 0x0F) << 3); - myMeta.tracks[tid].init = std::string(audioInit, 2); - //\todo This value is right now hardcoded, maybe fix this when we know how - myMeta.tracks[tid].size = 16; - } - buf.len = (((buf.data[3] & 0x03) << 11) | (buf.data[4] << 3) | ((buf.data[5] >> 5) & 0x07)) - (buf.data[1] & 0x01 ? 7 :9); - buf.curSampleCount += 1024 * ((buf.data[6] & 0x3) + 1);//Number of frames * samples per frame(1024); - buf.data.erase(0, (buf.data[1] & 0x01 ? 7 : 9));//Substract header - } - - void inputTS::parseH264PES(int tid, pesBuffer & buf){ - static char annexB[] = {0x00,0x00,0x01}; - static char nalLen[4]; - - int nalLength = 0; - std::string newData; - int pos = 0; - int nxtPos = buf.data.find(annexB, pos, 3); - //Rewrite buf.data from annexB to size-prefixed h.264 - while (nxtPos != std::string::npos){ - //Detect next packet (if any) and deduce current packet length - pos = nxtPos + 3; - nxtPos = buf.data.find(annexB, pos, 3); - if (nxtPos == std::string::npos){ - nalLength = buf.data.size() - pos; - }else{ - nalLength = nxtPos - pos; - if (buf.data[nxtPos - 1] == 0x00){//4-byte annexB header - nalLength--; - } - } - //Do nal type specific stuff - switch (buf.data[pos] & 0x1F){ - case 0x05: buf.isKey = true; break; - case 0x07: buf.sps = buf.data.substr(pos, nalLength); break; - case 0x08: buf.pps = buf.data.substr(pos, nalLength); break; - default: break; - } - if ((buf.data[pos] & 0x1F) != 0x07 && (buf.data[pos] & 0x1F) != 0x08 && (buf.data[pos] & 0x1F) != 0x09){ - //Append length + payload - nalLen[0] = (nalLength >> 24) & 0xFF; - nalLen[1] = (nalLength >> 16) & 0xFF; - nalLen[2] = (nalLength >> 8) & 0xFF; - nalLen[3] = nalLength & 0xFF; - newData.append(nalLen, 4); - newData += buf.data.substr(pos, nalLength); - } - } - buf.data = newData; - buf.len = newData.size(); - //If this packet had both a Sequence Parameter Set (sps) and a Picture Parameter Set (pps), calculate the metadata for the stream - if (buf.sps != "" && buf.pps != ""){ - MP4::AVCC avccBox; - avccBox.setVersion(1); - avccBox.setProfile(buf.sps[1]); - avccBox.setCompatibleProfiles(buf.sps[2]); - avccBox.setLevel(buf.sps[3]); - avccBox.setSPSNumber(1); - avccBox.setSPS(buf.sps); - avccBox.setPPSNumber(1); - avccBox.setPPS(buf.pps); - myMeta.tracks[tid].init = std::string(avccBox.payload(), avccBox.payloadSize()); - h264::SPS tmpNal(buf.sps, true); - h264::SPSMeta tmpMeta = tmpNal.getCharacteristics(); - myMeta.tracks[tid].width = tmpMeta.width; - myMeta.tracks[tid].height = tmpMeta.height; - myMeta.tracks[tid].fpks = tmpMeta.fps * 1000; - } - } ///Reads headers from a TS stream, and saves them into metadata ///It works by going through the entire TS stream, and every time @@ -229,288 +95,93 @@ namespace Mist { TS::Packet packet;//to analyse and extract data fseek(inFile, 0, SEEK_SET);//seek to beginning - JSON::Value thisPacket; - - std::set PATIds; - std::map pidToType; - std::map lastBuffer; - //h264::SPSmMta spsdata;//to analyse sps data, and extract resolution etc... + bool first = true; long long int lastBpos = 0; while (packet.FromFile(inFile)){ - //Handle special packets (PAT/PMT) - if(packet.getPID() == 0x00){ - PATIds.clear(); - for (int i = 0; i < ((TS::ProgramAssociationTable&)packet).getProgramCount(); i++){ - PATIds.insert(((TS::ProgramAssociationTable&)packet).getProgramPID(i)); - } - } - if(PATIds.count(packet.getPID())){ - TS::ProgramMappingEntry entry = ((TS::ProgramMappingTable&)packet).getEntry(0); - while(entry){ - unsigned int pid = entry.getElementaryPid(); - pidToType[pid] = entry.getStreamType(); - //Check if the track exists in metadata - if (!myMeta.tracks.count(pid)){ - switch (entry.getStreamType()){ - case 0x1B: - myMeta.tracks[pid].codec = "H264"; - myMeta.tracks[pid].type = "video"; - myMeta.tracks[pid].trackID = pid; - break; - case 0x0F: - myMeta.tracks[pid].codec = "AAC"; - myMeta.tracks[pid].type = "audio"; - myMeta.tracks[pid].trackID = pid; - break; - case 0x81: - myMeta.tracks[pid].codec = "AC3"; - myMeta.tracks[pid].type = "audio"; - myMeta.tracks[pid].trackID = pid; - break; - default: - DEBUG_MSG(DLVL_WARN, "Ignoring unsupported track type %0.2X, on pid %d", entry.getStreamType(), pid); - break; - } - } - entry.advance(); - } - } - if(pidToType.count(packet.getPID())){ - //analyzing audio/video - //we have audio/video payload - //get trackID of this packet - int tid = packet.getPID(); - if (packet.getUnitStart() && lastBuffer.count(tid) && lastBuffer[tid].len){ - parsePESPayload(tid, lastBuffer[tid]); - thisPacket.null(); - thisPacket["data"] = lastBuffer[tid].data; - thisPacket["trackid"] = tid;//last trackid - thisPacket["bpos"] = lastBuffer[tid].bpos; - thisPacket["time"] = lastBuffer[tid].time ; - if (lastBuffer[tid].offset){ - thisPacket["offset"] = lastBuffer[tid].offset; - } - if (lastBuffer[tid].isKey){ - thisPacket["keyframe"] = 1LL; - } - myMeta.update(thisPacket);//metadata was read in - lastBuffer.erase(tid); - } - if (!lastBuffer.count(tid)){ - lastBuffer[tid] = pesBuffer(); - lastBuffer[tid].bpos = lastBpos; - } - lastBuffer[tid].data.append(packet.getPayload(), packet.getPayloadLength()); - if (!lastBuffer[tid].len){ - parsePESHeader(tid, lastBuffer[tid]); - } - if (lastBuffer[tid].data.size() == lastBuffer[tid].len) { - parsePESPayload(tid, lastBuffer[tid]); - if (myMeta.tracks[tid].codec == "AAC"){ - while(lastBuffer[tid].data.size()){ - thisPacket.null(); - thisPacket["data"] = lastBuffer[tid].data.substr(0, lastBuffer[tid].len); - thisPacket["trackid"] = tid;//last trackid - thisPacket["bpos"] = lastBuffer[tid].bpos; - thisPacket["time"] = lastBuffer[tid].time + (long long int)((double)((lastBuffer[tid].curSampleCount - 1024) * 1000)/ myMeta.tracks[tid].rate) ; - myMeta.update(thisPacket);//metadata was read in - lastBuffer[tid].data.erase(0, lastBuffer[tid].len); - parsePESPayload(tid, lastBuffer[tid]); - } - }else{ - thisPacket.null(); - thisPacket["data"] = lastBuffer[tid].data; - thisPacket["trackid"] = tid;//last trackid - thisPacket["bpos"] = lastBuffer[tid].bpos; - thisPacket["time"] = lastBuffer[tid].time ; - if (myMeta.tracks[tid].type == "video"){ - if (lastBuffer[tid].offset){ - thisPacket["offset"] = lastBuffer[tid].offset; - } - if (lastBuffer[tid].isKey){ - thisPacket["keyframe"] = 1LL; - } - } - myMeta.update(thisPacket);//metadata was read in - } - lastBuffer.erase(tid); - } - } + tsStream.parse(packet, lastBpos); lastBpos = ftell(inFile); + while(tsStream.hasPacketOnEachTrack()){ + if (first){ + tsStream.initializeMetadata(myMeta); + first = false; + } + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + myMeta.update(headerPack); + } + } std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str()); oFile << myMeta.toJSON().toNetPacked(); oFile.close(); + exit(1); return true; } - ///Reads a full PES packet, starting at the current byteposition - ///Assumes that you want a full PES for the first PID encountered - ///\todo Update to search for a specific PID - pesBuffer inputTS::readFullPES(int tid){ - pesBuffer pesBuf; - pesBuf.tid = tid; - if (feof(inFile)){ - DEBUG_MSG(DLVL_DEVEL, "Trying to read a PES past the end of the file, returning"); - return pesBuf; - } - unsigned int lastPos = ftell(inFile); - TS::Packet tsBuf; - tsBuf.FromFile(inFile); - //Find first PES start on the selected track - while (tsBuf.getPID() != tid || !tsBuf.getUnitStart()){ - lastPos = ftell(inFile); - tsBuf.FromFile(inFile); - if (feof(inFile)){ - return pesBuf; - } - } - pesBuf.bpos = lastPos; - pesBuf.data.append(tsBuf.getPayload(), tsBuf.getPayloadLength()); - parsePESHeader(tid, pesBuf); - bool unbound = false; - while (pesBuf.data.size() != pesBuf.len){ - //ReadNextPage - tsBuf.FromFile(inFile); - if (tsBuf.getPID() == tid && tsBuf.getUnitStart()){ - unbound = true; - break; - } - if (feof(inFile)){ - DEBUG_MSG(DLVL_DEVEL, "Reached EOF at an unexpected point... what happened?"); - return pesBuf; - } - if (tsBuf.getPID() == tid){ - pesBuf.data.append(tsBuf.getPayload(), tsBuf.getPayloadLength()); - pesBuf.lastPos = ftell(inFile); - } - if (pesBuf.len == 0){ - parsePESHeader(tid, pesBuf); - } - } - pesBuf.lastPos = ftell(inFile); - if (unbound){ - pesBuf.lastPos -= 188; - } - parsePESPayload(tid, pesBuf); - return pesBuf; - } ///Gets the next packet that is to be sent ///At the moment, the logic of sending the last packet that was finished has been implemented, ///but the seeking and finding data is not yet ready. ///\todo Finish the implementation void inputTS::getNext(bool smart){ - static JSON::Value thisPack; - if ( !playbackBuf.size()){ - DEBUG_MSG(DLVL_WARN, "No seek positions set - returning empty packet."); - thisPacket.null(); + thisPacket.null(); + bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); + + if (!hasPacket && (pushing || !feof(inFile))){ + TS::Packet tsBuf; + if (!pushing) { + unsigned int bPos = ftell(inFile); + tsBuf.FromFile(inFile); + tsStream.parse(tsBuf, bPos); + }else{ + while (udpCon.Receive()){ + userClient.keepAlive(); + udpDataBuffer.append(udpCon.data, udpCon.data_len); + while (udpDataBuffer.size() > 188 && (udpDataBuffer[0] != 0x47 || udpDataBuffer[188] != 0x47)){ + size_t syncPos = udpDataBuffer.find("\107", 1); + udpDataBuffer.erase(0, syncPos); + } + while (udpDataBuffer.size() >= 188){ + tsBuf.FromPointer(udpDataBuffer.data()); + tsStream.parse(tsBuf, 0); + udpDataBuffer.erase(0,188); + } + } + } + hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); + } + if (!hasPacket){ return; } - - //Store current buffer - pesBuffer thisBuf = *playbackBuf.begin(); - playbackBuf.erase(playbackBuf.begin()); - - //Seek follow up - fseek(inFile, thisBuf.lastPos, SEEK_SET); - pesBuffer nxtBuf; - if (myMeta.tracks[thisBuf.tid].codec != "AAC" || playbackBuf.size() < 2){ - nxtBuf = readFullPES(thisBuf.tid); + if (selectedTracks.size() == 1){ + tsStream.getPacket(*selectedTracks.begin(), thisPacket); + }else{ + tsStream.getEarliestPacket(thisPacket); } - if (nxtBuf.len){ - if (myMeta.tracks[nxtBuf.tid].codec == "AAC"){//only in case of aac we have more packets, for now - while (nxtBuf.len){ - pesBuffer pesBuf; - pesBuf.tid = nxtBuf.tid; - pesBuf.time = nxtBuf.time + ((double)((nxtBuf.curSampleCount - 1024) * 1000)/ myMeta.tracks[nxtBuf.tid].rate) ; - pesBuf.offset = nxtBuf.offset; - pesBuf.len = nxtBuf.len; - pesBuf.lastPos = nxtBuf.lastPos; - pesBuf.isKey = false; - pesBuf.data = nxtBuf.data.substr(0, nxtBuf.len); - playbackBuf.insert(pesBuf); - - nxtBuf.data.erase(0, nxtBuf.len); - parsePESPayload(thisBuf.tid, nxtBuf); - } - }else{ - nxtBuf.data = nxtBuf.data.substr(0, nxtBuf.len); - playbackBuf.insert(nxtBuf); - } + tsStream.initializeMetadata(myMeta); + if (!myMeta.tracks.count(thisPacket.getTrackId())){ + getNext(); } - - thisPack.null(); - thisPack["data"] = thisBuf.data; - thisPack["trackid"] = thisBuf.tid; - thisPack["bpos"] = thisBuf.bpos; - thisPack["time"] = thisBuf.time; - if (thisBuf.offset){ - thisPack["offset"] = thisBuf.offset; - } - if (thisBuf.isKey){ - thisPack["keyframe"] = 1LL; - } - std::string tmpStr = thisPack.toNetPacked(); - thisPacket.reInit(tmpStr.data(), tmpStr.size()); } ///Seeks to a specific time void inputTS::seek(int seekTime){ + tsStream.clear(); + unsigned long seekPos = 0xFFFFFFFFull; for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - if (feof(inFile)){ - clearerr(inFile); - fseek(inFile, 0, SEEK_SET); - } - pesBuffer tmpBuf; - tmpBuf.tid = *it; - for (unsigned int i = 0; i < myMeta.tracks[*it].keys.size(); i++){ - if (myMeta.tracks[*it].keys[i].getTime() > seekTime){ + unsigned long thisBPos = 0; + for (std::deque::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++){ + if (keyIt->getTime() > seekTime){ break; } - if (myMeta.tracks[*it].keys[i].getTime() > tmpBuf.time){ - tmpBuf.time = myMeta.tracks[*it].keys[i].getTime(); - tmpBuf.bpos = myMeta.tracks[*it].keys[i].getBpos(); - } + thisBPos = keyIt->getBpos(); } - - bool foundPacket = false; - unsigned long long lastPos; - pesBuffer nxtBuf; - while ( !foundPacket){ - lastPos = ftell(inFile); - if (feof(inFile)){ - DEBUG_MSG(DLVL_WARN, "Reached EOF during seek to %u in track %d - aborting @ %lld", seekTime, *it, lastPos); - return; - } - fseek(inFile, tmpBuf.bpos, SEEK_SET); - nxtBuf = readFullPES(*it); - if (nxtBuf.time >= seekTime){ - foundPacket = true; - }else{ - tmpBuf.bpos = nxtBuf.lastPos; - } - } - if (myMeta.tracks[nxtBuf.tid].codec == "AAC"){//only in case of aac we have more packets, for now - while (nxtBuf.len){ - pesBuffer pesBuf; - pesBuf.tid = nxtBuf.tid; - pesBuf.time = nxtBuf.time + ((double)((nxtBuf.curSampleCount - 1024) * 1000)/ myMeta.tracks[nxtBuf.tid].rate); - pesBuf.offset = nxtBuf.offset; - pesBuf.len = nxtBuf.len; - pesBuf.lastPos = nxtBuf.lastPos; - pesBuf.isKey = false; - pesBuf.data = nxtBuf.data.substr(0, nxtBuf.len); - playbackBuf.insert(pesBuf); - - nxtBuf.data.erase(0, nxtBuf.len); - parsePESPayload(nxtBuf.tid, nxtBuf); - } - }else{ - playbackBuf.insert(nxtBuf); + if (thisBPos < seekPos){ + seekPos = thisBPos; } } + fseek(inFile, seekPos, SEEK_SET);//seek to the correct position } } diff --git a/src/input/input_ts.h b/src/input/input_ts.h index c02f4d0e..293b2478 100755 --- a/src/input/input_ts.h +++ b/src/input/input_ts.h @@ -2,57 +2,12 @@ #include #include #include +#include #include #include namespace Mist { - class pesBuffer { - public: - pesBuffer() : lastPos(0), len(0), time(0), offset(0), bpos(0), curSampleCount(0), isKey(false) {} - ///\brief Less-than comparison for seekPos structures. - ///\param rhs The seekPos to compare with. - ///\return Whether this object is smaller than rhs. - bool operator < (const pesBuffer & rhs) const { - if (time < rhs.time) { - return true; - } else { - if (time == rhs.time){ - if (tid < rhs.tid){ - return true; - } - } - } - return false; - } - int tid;//When used for buffering, not for header generation - long long int lastPos;//set by readFullPES, stores the byteposition directly after the last read ts packet - long long int len; - std::string data; - long long int time; - long long int offset; - long long int bpos; - long long int curSampleCount; - bool isKey; - std::string sps; - std::string pps; - }; - - -/* - /// This struct stores all metadata of a track, and sends them once a whole PES has been analyzed and sent - struct trackInfo{ - //saves all data that needs to be sent. - //as packets can be interleaved, the data needs to be temporarily stored - long long int lastPos;//last byte position of trackSelect - long long int pesTime;//the pes time of the current pes packet - bool keyframe;//whether the current pes packet of the track has a keyframe or not - std::string curPayload;//payload to be sent to user - unsigned int packetCount;//number of TS packets read between between and end (to set bpos correctly) - }; - -*/ - /// This class contains all functions needed to implement TS Input class inputTS : public Input { public: @@ -64,16 +19,13 @@ namespace Mist { void getNext(bool smart = true); void seek(int seekTime); void trackSelect(std::string trackSpec); - void parsePESHeader(int tid, pesBuffer & buf); - void parsePESPayload(int tid, pesBuffer & buf); - void parseH264PES(int tid, pesBuffer & buf); - void parseAACPES(int tid, pesBuffer & buf); - pesBuffer readFullPES(int tid); + FILE * inFile;/// playbackBuf;///Used for buffering playback items - std::map firstTimes; + TS::Stream tsStream;///%lu from the corresponding metaPage", pageNumber, tid, mapTid); for (int i = 0; i < 1024; i++) { @@ -266,7 +266,7 @@ namespace Mist { size_t curOffset = pagesByTrack[tid][curPageNum[tid]].curOffset; //Do nothing when there is not enough free space on the page to add the packet. if (pagesByTrack[tid][curPageNum[tid]].dataSize - curOffset < pack.getDataLen()) { - FAIL_MSG("Trying to buffer a packet on page %lu for track %lu~>%lu, but we have a size mismatch. The packet is %lu bytes long, so won't fit at offset %lu on a page of %lu bytes!", curPageNum[tid], tid, mapTid, pack.getDataLen(), curOffset, pagesByTrack[tid][curPageNum[tid]].dataSize); + FAIL_MSG("Trying to buffer a packet on page %lu for track %lu~>%lu, but we have a size mismatch. The packet is %d bytes long, so won't fit at offset %lu on a page of %llu bytes!", curPageNum[tid], tid, mapTid, pack.getDataLen(), curOffset, pagesByTrack[tid][curPageNum[tid]].dataSize); return; } @@ -385,15 +385,24 @@ namespace Mist { curPageNum.erase(tid); } + void InOutBase::bufferLivePacket(JSON::Value & packet) { + DTSC::Packet realPacket; + realPacket.genericFill(packet["time"].asInt(), packet["offset"].asInt(), packet["trackid"].asInt(), packet["data"].asStringRef().c_str(), packet["data"].asStringRef().size(), packet["bpos"].asInt(), packet["keyframe"].asInt()); + bufferLivePacket(realPacket); + } + + ///Buffers a live packet to a page. /// ///Handles both buffering and creation of new pages /// ///Initiates/continues negotiation with the buffer as well ///\param packet The packet to buffer - void InOutBase::bufferLivePacket(JSON::Value & packet) { + void InOutBase::bufferLivePacket(DTSC::Packet & packet){ + myMeta.vod = false; + myMeta.live = true; //Store the trackid for easier access - unsigned long tid = packet["trackid"].asInt(); + unsigned long tid = packet.getTrackId(); //Do nothing if the trackid is invalid if (!tid) { INFO_MSG("Packet without trackid"); @@ -430,7 +439,7 @@ namespace Mist { ///\todo Figure out how to act with declined track here bool isKeyframe = false; if (myMeta.tracks[tid].type == "video") { - if (packet.isMember("keyframe") && packet["keyframe"]) { + if (packet.hasMember("keyframe") && packet.getFlag("keyframe")) { isKeyframe = true; } } else { @@ -439,7 +448,7 @@ namespace Mist { isKeyframe = true; } else { unsigned long lastKey = pagesByTrack[tid].rbegin()->second.lastKeyTime; - if (packet["time"].asInt() - lastKey > 5000) { + if (packet.getTime() - lastKey > 5000) { isKeyframe = true; } } @@ -463,7 +472,7 @@ namespace Mist { pagesByTrack[tid][nextPageNum].dataSize = (25 * 1024 * 1024); pagesByTrack[tid][nextPageNum].pageNum = nextPageNum; } - pagesByTrack[tid].rbegin()->second.lastKeyTime = packet["time"].asInt(); + pagesByTrack[tid].rbegin()->second.lastKeyTime = packet.getTime(); pagesByTrack[tid].rbegin()->second.keyNum++; } //Set the pageNumber if it has not been set yet @@ -535,6 +544,11 @@ namespace Mist { } //Now we either returned or the track has an offset for the user page. //Get the data from the userPage + if (!userClient.getData()){ + char userPageName[100]; + sprintf(userPageName, SHM_USERS, streamName.c_str()); + userClient = IPC::sharedClient(userPageName, 30, true); + } char * tmp = userClient.getData(); if (!tmp) { DEBUG_MSG(DLVL_FAIL, "Failed to negotiate for incoming track %lu, there does not seem to be a connection with the buffer", tid); diff --git a/src/io.h b/src/io.h index b663a08d..927c4dbd 100644 --- a/src/io.h +++ b/src/io.h @@ -36,6 +36,7 @@ namespace Mist { void bufferFinalize(unsigned long tid); void bufferRemove(unsigned long tid, unsigned long pageNumber); void bufferLivePacket(JSON::Value & packet); + void bufferLivePacket(DTSC::Packet & packet); bool isBuffered(unsigned long tid, unsigned long keyNum); unsigned long bufferedOnPage(unsigned long tid, unsigned long keyNum); protected: @@ -53,15 +54,12 @@ namespace Mist { DTSC::Meta myMeta;///< Stores either the input or output metadata std::set selectedTracks;///< Stores the track id's that are either selected for playback or input - std::map > pagesByTrack;/// trackOffset; ///< Offset of data on user page std::map trackState; ///< State of the negotiation for tracks std::map trackMap;/// metaPages;///< For each track, holds the page that describes which dataPages are mapped std::map curPageNum;///< For each track, holds the number page that is currently being written. std::map curPage;///< For each track, holds the page that is currently being written. diff --git a/src/output/output.cpp b/src/output/output.cpp index 8557e6bc..eb1ba702 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -966,15 +966,10 @@ namespace Mist { } } if (!trackMap.size()){ + IPC::userConnection userConn(userClient.getData()); for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){ - unsigned int tId = *it; - char * thisData = userClient.getData() + (6 * tNum); - thisData[0] = ((tId >> 24) & 0xFF); - thisData[1] = ((tId >> 16) & 0xFF); - thisData[2] = ((tId >> 8) & 0xFF); - thisData[3] = ((tId) & 0xFF); - thisData[4] = ((nxtKeyNum[tId] >> 8) & 0xFF); - thisData[5] = ((nxtKeyNum[tId]) & 0xFF); + userConn.setTrackId(tNum, *it); + userConn.setKeynum(tNum, nxtKeyNum[*it]); tNum ++; } }