diff --git a/lib/config.cpp b/lib/config.cpp index 8300a206..65c5bdb2 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -32,6 +32,8 @@ bool Util::Config::is_active = false; std::string Util::Config::libver = PACKAGE_VERSION; +Util::Config::Config(){} + /// Creates a new configuration manager. Util::Config::Config(std::string cmd, std::string version){ vals.null(); diff --git a/lib/config.h b/lib/config.h index 2e25836b..1609efe7 100644 --- a/lib/config.h +++ b/lib/config.h @@ -24,6 +24,7 @@ namespace Util { static std::string libver; ///< Version number of the library as a string. static bool is_active; ///< Set to true by activate(), set to false by the signal handler. //functions + Config(); Config(std::string cmd, std::string version); void addOption(std::string optname, JSON::Value option); void printHelp(std::ostream & output); diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 5f54a0f5..c09cb94f 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -540,7 +540,7 @@ DTSC::File & DTSC::File::operator =(const File & rhs){ F = 0; } endPos = rhs.endPos; - strbuffer = rhs.strbuffer; + myPack = rhs.myPack; metaStorage = rhs.metaStorage; metadata = metaStorage; currtime = rhs.currtime; @@ -581,6 +581,32 @@ DTSC::File::File(std::string filename, bool create){ fseek(F, 0, SEEK_END); endPos = ftell(F); + bool sepHeader = false; + if (!create){ + fseek(F, 0, SEEK_SET); + if (fread(buffer, 4, 1, F) != 1){ + DEBUG_MSG(DLVL_ERROR, "Can't read file contents of %s", filename.c_str()); + fclose(F); + F = 0; + return; + } + if (memcmp(buffer, DTSC::Magic_Header, 4) != 0){ + if (memcmp(buffer, DTSC::Magic_Packet2, 4) != 0){ + File Fhead(filename + ".dtsh"); + if (Fhead){ + metadata = Fhead.metadata; + sepHeader = true; + }else{ + DEBUG_MSG(DLVL_ERROR, "%s is not a valid DTSC file", filename.c_str()); + fclose(F); + F = 0; + return; + } + }else{ + metadata.moreheader = -1; + } + } + } //we now know the first 4 bytes are DTSC::Magic_Header and we have a valid file fseek(F, 4, SEEK_SET); if (fread(buffer, 4, 1, F) != 1){ @@ -591,8 +617,20 @@ DTSC::File::File(std::string filename, bool create){ uint32_t * ubuffer = (uint32_t *)buffer; headerSize = ntohl(ubuffer[0]); } - readHeader(0); - fseek(F, 8 + headerSize, SEEK_SET); + if (metadata.moreheader != -1){ + if (!sepHeader){ + readHeader(0); + fseek(F, 8 + headerSize, SEEK_SET); + }else{ + fseek(F, 0, SEEK_SET); + } + }else{ + fseek(F, 0, SEEK_SET); + File Fhead(filename + ".dtsh"); + if (Fhead){ + metadata = Fhead.metadata; + } + } currframe = 0; } @@ -655,33 +693,29 @@ void DTSC::File::readHeader(int pos){ }else{ DEBUG_MSG(DLVL_ERROR, "Could not read header @ %d", pos); } - strbuffer = ""; metadata = readOnlyMeta(); return; } if (memcmp(buffer, DTSC::Magic_Header, 4) != 0){ DEBUG_MSG(DLVL_ERROR, "Invalid header - %.4s != %.4s @ %i", buffer, DTSC::Magic_Header, pos); - strbuffer = ""; metadata = readOnlyMeta(); return; } if (fread(buffer, 4, 1, F) != 1){ DEBUG_MSG(DLVL_ERROR, "Could not read header size @ %i", pos); - strbuffer = ""; metadata = readOnlyMeta(); return; } - uint32_t * ubuffer = (uint32_t *)buffer; - long packSize = ntohl(ubuffer[0]); - strbuffer.resize(packSize); + long packSize = ntohl(((uint32_t*)buffer)[0]); + std::string strBuffer; + strBuffer.resize(packSize); if (packSize){ - if (fread((void*)strbuffer.c_str(), packSize, 1, F) != 1){ + if (fread((void*)strBuffer.c_str(), packSize, 1, F) != 1){ DEBUG_MSG(DLVL_ERROR, "Could not read header packet @ %i", pos); - strbuffer = ""; metadata = readOnlyMeta(); return; } - JSON::fromDTMI(strbuffer, metaStorage); + JSON::fromDTMI(strBuffer, metaStorage); metadata = readOnlyMeta(metaStorage);//make readonly } //if there is another header, read it and replace metadata with that one. @@ -713,15 +747,12 @@ bool DTSC::File::reachedEOF(){ void DTSC::File::seekNext(){ if ( !currentPositions.size()){ DEBUG_MSG(DLVL_HIGH, "No seek positions set - returning empty packet."); - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } - DEBUG_MSG(DLVL_HIGH, "Seeking to %uT%lli @ %llu", currentPositions.begin()->trackID, currentPositions.begin()->seekTime, currentPositions.begin()->bytePos); fseek(F,currentPositions.begin()->bytePos, SEEK_SET); if ( reachedEOF()){ - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } clearerr(F); @@ -737,12 +768,11 @@ void DTSC::File::seekNext(){ }else{ DEBUG_MSG(DLVL_ERROR, "Could not seek to next @ %i", (int)lastreadpos); } - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } if (memcmp(buffer, DTSC::Magic_Header, 4) == 0){ - seek_time(jsonbuffer["time"].asInt() + 1, jsonbuffer["trackid"].asInt(), true); + seek_time(myPack.getTime() + 1, myPack.getTrackId(), true); return seekNext(); } long long unsigned int version = 0; @@ -754,32 +784,28 @@ void DTSC::File::seekNext(){ } if (version == 0){ DEBUG_MSG(DLVL_ERROR, "Invalid packet header @ %#x - %.4s != %.4s @ %d", (unsigned int)lastreadpos, buffer, DTSC::Magic_Packet2, (int)lastreadpos); - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } if (fread(buffer, 4, 1, F) != 1){ DEBUG_MSG(DLVL_ERROR, "Could not read packet size @ %d", (int)lastreadpos); - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } - uint32_t * ubuffer = (uint32_t *)buffer; - long packSize = ntohl(ubuffer[0]); - strbuffer.resize(packSize); - if (fread((void*)strbuffer.c_str(), packSize, 1, F) != 1){ + long packSize = ntohl(((uint32_t*)buffer)[0]); + std::string strBuffer = "DTP2"; + if (version == 1){ + strBuffer = "DTPD"; + } + strBuffer.append(buffer, 4); + strBuffer.resize(packSize + 8); + if (fread((void*)(strBuffer.c_str() + 8), packSize, 1, F) != 1){ DEBUG_MSG(DLVL_ERROR, "Could not read packet @ %d", (int)lastreadpos); - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } - if (version == 2){ - JSON::fromDTMI2(strbuffer, jsonbuffer); - }else{ - if (version == 1){ - JSON::fromDTMI(strbuffer, jsonbuffer); - } - } + const char * tmp = strBuffer.data(); + myPack.reInit(tmp, strBuffer.size()); if ( metadata.merged){ int tempLoc = getBytePos(); char newHeader[20]; @@ -795,9 +821,9 @@ void DTSC::File::seekNext(){ tmpPos.seekTime += ntohl(((int*)newHeader)[4]); insert = true; }else{ - long tid = jsonbuffer["trackid"].asInt(); + long tid = myPack.getTrackId(); for (unsigned int i = 0; i != metadata.tracks[tid].keyLen; i++){ - if (metadata.tracks[tid].keys[i].getTime() > jsonbuffer["time"].asInt()){ + if (metadata.tracks[tid].keys[i].getTime() > myPack.getTime()){ tmpPos.seekTime = metadata.tracks[tid].keys[i].getTime(); tmpPos.bytePos = metadata.tracks[tid].keys[i].getBpos(); tmpPos.trackID = tid; @@ -819,8 +845,9 @@ void DTSC::File::seekNext(){ if (insert){ currentPositions.insert(tmpPos); }else{ - seek_time(jsonbuffer["time"].asInt() + 1, jsonbuffer["trackid"].asInt(), true); + seek_time(myPack.getTime() + 1, myPack.getTrackId(), true); } + seek_bpos(tempLoc); } } @@ -833,31 +860,31 @@ void DTSC::File::parseNext(){ }else{ DEBUG_MSG(DLVL_ERROR, "Could not read header @ %d", (int)lastreadpos); } - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } if (memcmp(buffer, DTSC::Magic_Header, 4) == 0){ if (lastreadpos != 0){ readHeader(lastreadpos); - jsonbuffer = metadata.toJSON(); + std::string tmp = metaStorage.toNetPacked(); + myPack.reInit(tmp.data(), tmp.size()); + DEBUG_MSG(DLVL_DEVEL,"Does this ever even happen?"); }else{ if (fread(buffer, 4, 1, F) != 1){ DEBUG_MSG(DLVL_ERROR, "Could not read header size @ %d", (int)lastreadpos); - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } - uint32_t * ubuffer = (uint32_t *)buffer; - long packSize = ntohl(ubuffer[0]); - strbuffer.resize(packSize); - if (fread((void*)strbuffer.c_str(), packSize, 1, F) != 1){ + long packSize = ntohl(((uint32_t*)buffer)[0]); + std::string strBuffer = "DTSC"; + strBuffer.append(buffer, 4); + strBuffer.resize(packSize + 8); + if (fread((void*)(strBuffer.c_str() + 8), packSize, 1, F) != 1){ DEBUG_MSG(DLVL_ERROR, "Could not read header @ %d", (int)lastreadpos); - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } - JSON::fromDTMI(strbuffer, jsonbuffer); + myPack.reInit(strBuffer.data(), strBuffer.size()); } return; } @@ -870,30 +897,27 @@ void DTSC::File::parseNext(){ } if (version == 0){ DEBUG_MSG(DLVL_ERROR, "Invalid packet header @ %#x - %.4s != %.4s @ %d", (unsigned int)lastreadpos, buffer, DTSC::Magic_Packet2, (int)lastreadpos); - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } if (fread(buffer, 4, 1, F) != 1){ DEBUG_MSG(DLVL_ERROR, "Could not read packet size @ %d", (int)lastreadpos); - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } - uint32_t * ubuffer = (uint32_t *)buffer; - long packSize = ntohl(ubuffer[0]); - strbuffer.resize(packSize); - if (fread((void*)strbuffer.c_str(), packSize, 1, F) != 1){ + long packSize = ntohl(((uint32_t*)buffer)[0]); + std::string strBuffer = "DTP2"; + if (version == 1){ + strBuffer = "DTPD"; + } + strBuffer.append(buffer, 4); + strBuffer.resize(packSize + 8); + if (fread((void*)(strBuffer.c_str() + 8), packSize, 1, F) != 1){ DEBUG_MSG(DLVL_ERROR, "Could not read packet @ %d", (int)lastreadpos); - strbuffer = ""; - jsonbuffer.null(); + myPack.null(); return; } - if (version == 2){ - JSON::fromDTMI2(strbuffer, jsonbuffer); - }else{ - JSON::fromDTMI(strbuffer, jsonbuffer); - } + myPack.reInit(strBuffer.data(), strBuffer.size()); } /// Returns the byte positon of the start of the last packet that was read. @@ -902,32 +926,29 @@ long long int DTSC::File::getLastReadPos(){ } /// Returns the internal buffer of the last read packet in raw binary format. -std::string & DTSC::File::getPacket(){ - return strbuffer; -} - -/// Returns the internal buffer of the last read packet in JSON format. -JSON::Value & DTSC::File::getJSON(){ - return jsonbuffer; +DTSC::Packet & DTSC::File::getPacket(){ + return myPack; } bool DTSC::File::seek_time(unsigned int ms, int trackNo, bool forceSeek){ seekPos tmpPos; tmpPos.trackID = trackNo; - if (!forceSeek && jsonbuffer && ms > jsonbuffer["time"].asInt() && trackNo >= jsonbuffer["trackid"].asInt()){ - tmpPos.seekTime = jsonbuffer["time"].asInt(); + if (!forceSeek && myPack && ms > myPack.getTime() && trackNo >= myPack.getTrackId()){ + tmpPos.seekTime = myPack.getTime(); tmpPos.bytePos = getBytePos(); }else{ tmpPos.seekTime = 0; tmpPos.bytePos = 0; } - for (unsigned int i = 0; i < metadata.tracks[trackNo].keyLen; i++){ - if (metadata.tracks[trackNo].keys[i].getTime() > ms){ + DTSC::readOnlyTrack & trackRef = metadata.tracks[trackNo]; + for (unsigned int i = 0; i < trackRef.keyLen; i++){ + long keyTime = trackRef.keys[i].getTime(); + if (keyTime > ms){ break; } - if ((long long unsigned int)metadata.tracks[trackNo].keys[i].getTime() > tmpPos.seekTime){ - tmpPos.seekTime = metadata.tracks[trackNo].keys[i].getTime(); - tmpPos.bytePos = metadata.tracks[trackNo].keys[i].getBpos(); + if ((long long unsigned int)keyTime > tmpPos.seekTime){ + tmpPos.seekTime = keyTime; + tmpPos.bytePos = trackRef.keys[i].getBpos(); } } if (reachedEOF()){ @@ -1007,14 +1028,14 @@ void DTSC::File::writePacket(JSON::Value & newPacket){ } bool DTSC::File::atKeyframe(){ - if (getJSON().isMember("keyframe")){ + if (myPack.getFlag("keyframe")){ return true; } - long long int bTime = jsonbuffer["time"].asInt(); - int trackid = jsonbuffer["trackid"].asInt(); - for (unsigned int i = 0; i < metadata.tracks[trackid].keyLen; i++){ - if (metadata.tracks[trackid].keys[i].getTime() >= bTime){ - return (metadata.tracks[trackid].keys[i].getTime() == bTime); + long long int bTime = myPack.getTime(); + DTSC::readOnlyTrack & trackRef = metadata.tracks[myPack.getTrackId()]; + for (unsigned int i = 0; i < trackRef.keyLen; i++){ + if (trackRef.keys[i].getTime() >= bTime){ + return (trackRef.keys[i].getTime() == bTime); } } return false; diff --git a/lib/dtsc.h b/lib/dtsc.h index 9650ee68..67a5ef50 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -16,8 +16,8 @@ namespace DTSC { bool isFixed(JSON::Value & metadata); - /// This enum holds all possible datatypes for DTSC packets. - enum datatype{ + ///\brief This enum holds all possible datatypes for DTSC packets. + enum datatype { AUDIO, ///< Stream Audio data VIDEO, ///< Stream Video data META, ///< Stream Metadata @@ -30,50 +30,98 @@ namespace DTSC { extern char Magic_Packet[]; ///< The magic bytes for a DTSC packet extern char Magic_Packet2[]; ///< The magic bytes for a DTSC packet version 2 - /// A simple structure used for ordering byte seek positions. + ///\brief A simple structure used for ordering byte seek positions. struct seekPos { - bool operator < (const seekPos& rhs) const { - if (seekTime < rhs.seekTime){ + ///\brief Less-than comparison for seekPos structures. + ///\param rhs The seekPos to compare with. + ///\return Whether this object is smaller than rhs. + bool operator < (const seekPos & rhs) const { + if (seekTime < rhs.seekTime) { return true; - }else{ - if (seekTime == rhs.seekTime){ - if (trackID < rhs.trackID){ + } else { + if (seekTime == rhs.seekTime) { + if (trackID < rhs.trackID) { return true; } } } return false; } - long long unsigned int seekTime; - long long unsigned int bytePos; - unsigned int trackID; + long long unsigned int seekTime;///< Stores the timestamp of the DTSC packet referenced by this structure. + long long unsigned int bytePos;///< Stores the byteposition of the DTSC packet referenced by this structure. + unsigned int trackID;///< Stores the track the DTSC packet referenced by this structure is associated with. + }; + + enum packType{ + DTSC_INVALID, + DTSC_HEAD, + DTSC_V1, + DTSC_V2 + }; + + /// DTSC::Packets can currently be three types: + /// DTSC_HEAD packets are the "DTSC" header string, followed by 4 bytes len and packed content. + /// DTSC_V1 packets are "DTPD", followed by 4 bytes len and packed content. + /// DTSC_V2 packets are "DTP2", followed by 4 bytes len, 4 bytes trackID, 8 bytes time, and packed content. + /// The len is always without the first 8 bytes counted. + class Packet { + public: + Packet(); + Packet(const Packet & rhs); + Packet(const char * data_, unsigned int len, bool noCopy = false); + ~Packet(); + void null(); + void operator = (const Packet & rhs); + operator bool() const; + packType getVersion(); + void reInit(const char * data_, unsigned int len, bool noCopy = false); + void getString(const char * identifier, char *& result, int & len); + void getString(const char * identifier, std::string & result); + void getInt(const char * identifier, int & result); + int getInt(const char * identifier); + void getFlag(const char * identifier, bool & result); + bool getFlag(const char * identifier); + bool hasMember(const char * identifier); + long long unsigned int getTime(); + long int getTrackId(); + char * getData(); + int getDataLen(); + JSON::Value toJSON(); + protected: + bool master; + packType version; + char * findIdentifier(const char * identifier); + void resize(unsigned int size); + char * data; + unsigned int bufferLen; + unsigned int dataLen; }; /// A simple structure used for ordering byte seek positions. struct livePos { - livePos(){ + livePos() { seekTime = 0; trackID = 0; } - livePos(const livePos & rhs){ + livePos(const livePos & rhs) { seekTime = rhs.seekTime; trackID = rhs.trackID; } - void operator = (const livePos& rhs) { + void operator = (const livePos & rhs) { seekTime = rhs.seekTime; trackID = rhs.trackID; } - bool operator == (const livePos& rhs) { + bool operator == (const livePos & rhs) { return seekTime == rhs.seekTime && trackID == rhs.trackID; } - bool operator != (const livePos& rhs) { + bool operator != (const livePos & rhs) { return seekTime != rhs.seekTime || trackID != rhs.trackID; } - bool operator < (const livePos& rhs) const { - if (seekTime < rhs.seekTime){ + bool operator < (const livePos & rhs) const { + if (seekTime < rhs.seekTime) { return true; - }else{ - if (seekTime > rhs.seekTime){ + } else { + if (seekTime > rhs.seekTime) { return false; } } @@ -85,7 +133,7 @@ namespace DTSC { /// A part from the DTSC::Stream ringbuffer. /// Holds information about a buffer that will stay consistent - class Ring{ + class Ring { public: Ring(livePos v); livePos b; @@ -96,7 +144,9 @@ namespace DTSC { volatile int playCount; }; - class Part{ + + ///\brief Basic class for storage of data associated with single DTSC packets, a.k.a. parts. + class Part { public: long getSize(); void setSize(long newSize); @@ -104,12 +154,21 @@ namespace DTSC { void setDuration(short newDuration); long getOffset(); void setOffset(long newOffset); - char* getData(); + char * getData(); + void toPrettyString(std::stringstream & str, int indent = 0); private: + ///\brief Data storage for this packet. + /// + /// - 3 bytes: MSB storage of the payload size of this packet in bytes. + /// - 2 bytes: MSB storage of the duration of this packet in milliseconds. + /// - 4 bytes: MSB storage of the presentation time offset of this packet in milliseconds. char data[9]; }; - class Key{ + ///\brief Basic class for storage of data associated with keyframes. + /// + /// When deleting this object, make sure to remove all DTSC::Part associated with it, if any. If you fail doing this, it *will* cause data corruption. + class Key { public: long long unsigned int getBpos(); void setBpos(long long unsigned int newBpos); @@ -121,12 +180,21 @@ namespace DTSC { void setParts(short newParts); long getTime(); void setTime(long newTime); - char* getData(); + char * getData(); + void toPrettyString(std::stringstream & str, int indent = 0); private: + ///\brief Data storage for this packet. + /// + /// - 5 bytes: MSB storage of the position of the first packet of this keyframe within the file. + /// - 3 bytes: MSB storage of the duration of this keyframe. + /// - 2 bytes: MSB storage of the number of this keyframe. + /// - 2 bytes: MSB storage of the amount of parts in this keyframe. + /// - 4 bytes: MSB storage of the timestamp associated with this keyframe's first packet. char data[16]; }; - class Fragment{ + ///\brief Basic class for storage of data associated with fragments. + class Fragment { public: long getDuration(); void setDuration(long newDuration); @@ -136,26 +204,28 @@ namespace DTSC { void setNumber(short newNumber); long getSize(); void setSize(long newSize); - char* getData(); + char * getData(); + void toPrettyString(std::stringstream & str, int indent = 0); private: char data[11]; }; - class readOnlyTrack{ + class readOnlyTrack { public: readOnlyTrack(); readOnlyTrack(JSON::Value & trackRef); int getSendLen(); void send(Socket::Connection & conn); + void writeTo(char *& p); std::string getIdentifier(); std::string getWritableIdentifier(); JSON::Value toJSON(); long long unsigned int fragLen; - Fragment* fragments; + Fragment * fragments; long long unsigned int keyLen; - Key* keys; + Key * keys; long long unsigned int partLen; - Part* parts; + Part * parts; int trackID; int firstms; int lastms; @@ -175,6 +245,7 @@ namespace DTSC { //vorbis and theora only std::string idHeader; std::string commentHeader; + void toPrettyString(std::stringstream & str, int indent = 0, int verbosity = 0); }; class Track : public readOnlyTrack { @@ -182,32 +253,42 @@ namespace DTSC { Track(); Track(const readOnlyTrack & rhs); Track(JSON::Value & trackRef); - inline operator bool() const {return parts.size();} + inline operator bool() const { + return parts.size(); + } + void update(DTSC::Packet & pack); void update(JSON::Value & pack); int getSendLen(); void send(Socket::Connection & conn); + void writeTo(char *& p); JSON::Value toJSON(); std::deque fragments; std::deque keys; std::deque parts; Key & getKey(unsigned int keyNum); void reset(); + void toPrettyString(std::stringstream & str, int indent = 0, int verbosity = 0); }; class readOnlyMeta { public: readOnlyMeta(); readOnlyMeta(JSON::Value & meta); - inline operator bool() const {return vod || live;} - std::map tracks; + inline operator bool() const { + return vod || live; + } + std::map tracks; bool vod; bool live; bool merged; long long int moreheader; long long int bufferWindow; + unsigned int getSendLen(); void send(Socket::Connection & conn); + void writeTo(char * p); JSON::Value toJSON(); bool isFixed(); + void toPrettyString(std::stringstream & str, int indent = 0, int verbosity = 0); }; class Meta : public readOnlyMeta { @@ -215,16 +296,20 @@ namespace DTSC { Meta(); Meta(const readOnlyMeta & meta); Meta(JSON::Value & meta); - std::map tracks; + std::map tracks; + void update(DTSC::Packet & pack); void update(JSON::Value & pack); + unsigned int getSendLen(); void send(Socket::Connection & conn); + void writeTo(char * p); JSON::Value toJSON(); void reset(); bool isFixed(); + void toPrettyString(std::stringstream & str, int indent = 0, int verbosity = 0); }; /// A simple wrapper class that will open a file and allow easy reading/writing of DTSC data from/to it. - class File{ + class File { public: File(); File(const File & rhs); @@ -241,8 +326,7 @@ namespace DTSC { bool reachedEOF(); void seekNext(); void parseNext(); - std::string & getPacket(); - JSON::Value & getJSON(); + DTSC::Packet & getPacket(); bool seek_time(unsigned int ms); bool seek_time(unsigned int ms, int trackNo, bool forceSeek = false); bool seek_bpos(int bpos); @@ -254,11 +338,10 @@ namespace DTSC { private: long int endPos; void readHeader(int pos); - std::string strbuffer; - JSON::Value jsonbuffer; + DTSC::Packet myPack; JSON::Value metaStorage; readOnlyMeta metadata; - std::map trackMapping; + std::map trackMapping; long long int currtime; long long int lastreadpos; int currframe; @@ -274,7 +357,7 @@ namespace DTSC { /// Holds temporary data for a DTSC stream and provides functions to utilize it. /// Optionally also acts as a ring buffer of a certain requested size. /// If ring buffering mode is enabled, it will automatically grow in size to always contain at least one keyframe. - class Stream{ + class Stream { public: Stream(); virtual ~Stream(); @@ -302,17 +385,18 @@ namespace DTSC { void endStream(); void waitForMeta(Socket::Connection & sourceSocket); void waitForPause(Socket::Connection & sourceSocket); - protected: + protected: void cutOneBuffer(); void resetStream(); - std::map buffers; - std::map > keyframes; + std::map buffers; + std::map > keyframes; virtual void addPacket(JSON::Value & newPack); virtual void addMeta(JSON::Value & newMeta); datatype datapointertype; unsigned int buffercount; unsigned int buffertime; - std::map trackMapping; + std::map trackMapping; virtual void deletionCallback(livePos deleting); }; } + diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index 1b2a13da..c406fd49 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -1,24 +1,27 @@ #include "dtsc.h" #include "defines.h" +#include +#include + /// Retrieves a short in network order from the pointer p. -static short btohs(char * p){ +static short btohs(char * p) { return (p[0] << 8) + p[1]; } /// Stores a short value of val in network order to the pointer p. -static void htobs(char * p, short val){ +static void htobs(char * p, short val) { p[0] = (val >> 8) & 0xFF; p[1] = val & 0xFF; } /// Retrieves a long in network order from the pointer p. -static long btohl(char * p){ +static long btohl(char * p) { return (p[0] << 24) + (p[1] << 16) + (p[2] << 8) + p[3]; } /// Stores a long value of val in network order to the pointer p. -static void htobl(char * p, long val){ +static void htobl(char * p, long val) { p[0] = (val >> 24) & 0xFF; p[1] = (val >> 16) & 0xFF; p[2] = (val >> 8) & 0xFF; @@ -26,41 +29,351 @@ static void htobl(char * p, long val){ } namespace DTSC { - long Part::getSize(){ + /// Default constructor for packets - sets a null pointer and invalid packet. + Packet::Packet() { + data = NULL; + bufferLen = 0; + dataLen = 0; + master = false; + version = DTSC_INVALID; + } + + /// Copy constructor for packets, copies an existing packet with same noCopy flag as original. + Packet::Packet(const Packet & rhs) { + Packet(rhs.data, rhs.dataLen, !rhs.master); + } + + /// Data constructor for packets, either references or copies a packet from raw data. + Packet::Packet(const char * data_, unsigned int len, bool noCopy) { + master = false; + bufferLen = 0; + data = NULL; + reInit(data_, len, noCopy); + } + + /// This destructor clears frees the data pointer if the packet was not a reference. + Packet::~Packet() { + if (master && data) { + free(data); + } + } + + /// Copier for packets, copies an existing packet with same noCopy flag as original. + /// If going from copy to noCopy, this will free the data pointer first. + void Packet::operator = (const Packet & rhs) { + if (master && !rhs.master){ + null(); + } + reInit(rhs.data, rhs.dataLen, !rhs.master); + } + + /// Returns true if the packet is deemed valid, false otherwise. + /// Valid packets have a length of at least 8, known header type, and length equal to the length set in the header. + Packet::operator bool() const { + if (!data) { + return false; + } + if (dataLen < 8) { + return false; + } + if (version == DTSC_INVALID){return false;} + if (ntohl(((int *)data)[1]) + 8 != dataLen) { + return false; + } + return true; + } + + /// Returns the recognized packet type. + /// This type is set by reInit and all constructors, and then just referenced from there on. + packType Packet::getVersion(){ + return version; + } + + /// Resets this packet back to the same state as if it had just been freshly constructed. + /// If needed, this frees the data pointer. + void Packet::null() { + if (master && data){ + free(data); + } + master = false; + data = NULL; + bufferLen = 0; + dataLen = 0; + version = DTSC_INVALID; + } + + /// Internally used resize function for when operating in copy mode and the internal buffer is too small. + /// It will only resize up, never down. + void Packet::resize(unsigned int len) { + if (master && len > bufferLen) { + char * tmp = (char *)realloc(data, len); + if (tmp) { + data = tmp; + bufferLen = len; + } else { + DEBUG_MSG(DLVL_FAIL, "Out of memory on parsing a packet"); + } + } + } + + void Packet::reInit(const char * data_, unsigned int len, bool noCopy) { + if (!data_){ + DEBUG_MSG(DLVL_DEVEL, "ReInit received a null pointer with len %d, ignoring", len); + null(); + return; + } + if (data_[0] != 'D' || data_[1] != 'T'){ + DEBUG_MSG(DLVL_HIGH, "ReInit received a pointer that didn't start with 'DT' - data corruption?"); + null(); + return; + } + if (len <= 0) { + len = ntohl(((int *)data_)[1]) + 8; + } + //clear any existing controlled contents + if (master && noCopy){ + null(); + } + //set control flag to !noCopy + master = !noCopy; + //either copy the data, or only the pointer, depending on flag + if (noCopy){ + data = (char*)data_; + }else{ + resize(len); + memcpy(data, data_, len); + } + //check header type and store packet length + dataLen = len; + version = DTSC_INVALID; + if (len > 3){ + if (!memcmp(data, Magic_Packet2, 4)){ + version = DTSC_V2; + }else{ + if (!memcmp(data, Magic_Packet, 4)){ + version = DTSC_V1; + }else{ + if (!memcmp(data, Magic_Header, 4)){ + version = DTSC_HEAD; + }else{ + DEBUG_MSG(DLVL_FAIL, "ReInit received a packet with invalid header"); + return; + } + } + } + }else{ + DEBUG_MSG(DLVL_FAIL, "ReInit received a packet with size < 4"); + return; + } + } + + /// Helper function for findIdentifier + static char * findInside(const char * identifier, char *& p, char * max){ + if (p+1 >= max || p[0] == 0x00){ + return (char*)1;//out of packet! 1 == error + } + if (p[0] == 0x01){ + //int, skip 9 bytes to next value + p+=9; + return 0; + } + if (p[0] == 0x02){ + if (p+4 >= max){ + return (char*)1;//out of packet! 1 == error + } + //string, read size and skip to next value + unsigned int tmpi = p[1] * 256 * 256 * 256 + p[2] * 256 * 256 + p[3] * 256 + p[4]; + p += tmpi + 5; + return 0; + } + if (p[0] == 0xE0 || p[0] == 0xFF){ + p++; + unsigned int id_len = strlen(identifier); + //object, scan contents + while (p[0] + p[1] != 0 && p < max){ //while not encountering 0x0000 (we assume 0x0000EE) + if (p+2 >= max){ + return (char*)1;//out of packet! 1 == error + } + unsigned int tmpi = p[0] * 256 + p[1]; //set tmpi to the UTF-8 length + //compare the name, if match, return contents + if (tmpi == id_len){ + if (memcmp(p+2, identifier, tmpi) == 0){ + return p+2+tmpi; + } + } + p += 2+tmpi;//skip size + //otherwise, search through the contents, if needed, and continue + char * tmp_ret = findInside(identifier, p, max); + if (tmp_ret){ + return tmp_ret; + } + } + p += 3;//skip end marker + return 0; + } + if (p[0] == 0x0A){ + p++; + //array, scan contents + while (p[0] + p[1] != 0 && p < max){ //while not encountering 0x0000 (we assume 0x0000EE) + //search through contents... + char * tmp_ret = findInside(identifier, p, max); + if (tmp_ret){ + return tmp_ret; + } + //no match, continue search + } + p += 3; //skip end marker + return 0; + } + DEBUG_MSG(DLVL_FAIL, "Unimplemented DTMI type %hhx, @ %p / %p - returning.", p[0], p, max); + return (char*)1;//out of packet! 1 == error + } + + char * Packet::findIdentifier(const char * identifier){ + char * p = data; + if (version == DTSC_V2){ + p += 20; + }else{ + p += 8; + } + char * ret = findInside(identifier, p, data+dataLen); + return ret; + } + + void Packet::getString(const char * identifier, char *& result, int & len) { + char * pos = findIdentifier(identifier); + if (pos < (char*)2) { + result = NULL; + len = 0; + return; + } + if (pos[0] != 0x02) { + result = NULL; + len = 0; + return; + } + result = pos + 5; + len = ntohl(((int *)(pos + 1))[0]); + } + + void Packet::getString(const char * identifier, std::string & result) { + char * data = NULL; + int len = 0; + getString(identifier, data, len); + result = std::string(data, len); + } + + void Packet::getInt(const char * identifier, int & result) { + char * pos = findIdentifier(identifier); + if (pos < (char*)2) { + result = 0; + return; + } + if (pos[0] != 0x01) { + result = 0; + return; + } + result = ((long long int)pos[1] << 56) | ((long long int)pos[2] << 48) | ((long long int)pos[3] << 40) | ((long long int)pos[4] << 32) | ((long long int)pos[5] << 24) | ((long long int)pos[6] << 16) | ((long long int)pos[7] << 8) | pos[8]; + } + + int Packet::getInt(const char * identifier) { + int result; + getInt(identifier, result); + return result; + } + + void Packet::getFlag(const char * identifier, bool & result) { + int result_; + getInt(identifier, result_); + result = (bool)result_; + } + + bool Packet::getFlag(const char * identifier) { + bool result; + getFlag(identifier, result); + return result; + } + + bool Packet::hasMember(const char * identifier) { + return findIdentifier(identifier) > (char*)2; + } + + long long unsigned int Packet::getTime() { + if (version != DTSC_V2){ + if (!data){return 0;} + return getInt("time"); + } + return ((long long int)ntohl(((int *)(data + 12))[0]) << 32) | ntohl(((int *)(data + 12))[1]); + } + + long int Packet::getTrackId() { + if (version != DTSC_V2){ + return getInt("trackid"); + } + return ntohl(((int *)data)[2]); + } + + char * Packet::getData() { + return data; + } + + int Packet::getDataLen() { + return dataLen; + + } + + JSON::Value Packet::toJSON(){ + JSON::Value result; + unsigned int i = 8; + if (getVersion() == DTSC_V1){ + JSON::fromDTMI((const unsigned char *)data, dataLen, i, result); + } + if (getVersion() == DTSC_V2){ + JSON::fromDTMI2((const unsigned char *)data, dataLen, i, result); + } + return result; + } + + + long Part::getSize() { return ((long)data[0] << 16) | ((long)data[1] << 8) | data[2]; } - void Part::setSize(long newSize){ + void Part::setSize(long newSize) { data[0] = (newSize & 0xFF0000) >> 16; data[1] = (newSize & 0x00FF00) >> 8; data[2] = (newSize & 0x0000FF); } - short Part::getDuration(){ - return btohs(data+3); + short Part::getDuration() { + return btohs(data + 3); } - void Part::setDuration(short newDuration){ - htobs(data+3, newDuration); + void Part::setDuration(short newDuration) { + htobs(data + 3, newDuration); } - long Part::getOffset(){ - return btohl(data+5); + long Part::getOffset() { + return btohl(data + 5); } - void Part::setOffset(long newOffset){ - htobl(data+5, newOffset); + void Part::setOffset(long newOffset) { + htobl(data + 5, newOffset); } - char* Part::getData(){ + char * Part::getData() { return data; } - long long unsigned int Key::getBpos(){ + void Part::toPrettyString(std::stringstream & str, int indent){ + str << std::string(indent, ' ') << "Part: Size(" << getSize() << "), Dur(" << getDuration() << "), Offset(" << getOffset() << ")" << std::endl; + } + + long long unsigned int Key::getBpos() { return (((long long unsigned int)data[0] << 32) | (data[1] << 24) | (data[2] << 16) | (data[3] << 8) | data[4]); } - void Key::setBpos(long long unsigned int newBpos){ + void Key::setBpos(long long unsigned int newBpos) { data[4] = newBpos & 0xFF; data[3] = (newBpos >> 8) & 0xFF; data[2] = (newBpos >> 16) & 0xFF; @@ -68,81 +381,89 @@ namespace DTSC { data[0] = (newBpos >> 32) & 0xFF; } - long Key::getLength(){ + long Key::getLength() { return ((data[5] << 16) | (data[6] << 8) | data[7]); } - void Key::setLength(long newLength){ + void Key::setLength(long newLength) { data[7] = newLength & 0xFF; data[6] = (newLength >> 8) & 0xFF; data[5] = (newLength >> 16) & 0xFF; } - unsigned short Key::getNumber(){ - return btohs(data+8); + unsigned short Key::getNumber() { + return btohs(data + 8); } - void Key::setNumber(unsigned short newNumber){ - htobs(data+8, newNumber); + void Key::setNumber(unsigned short newNumber) { + htobs(data + 8, newNumber); } - short Key::getParts(){ - return btohs(data+10); + short Key::getParts() { + return btohs(data + 10); } - void Key::setParts(short newParts){ - htobs(data+10, newParts); + void Key::setParts(short newParts) { + htobs(data + 10, newParts); } - long Key::getTime(){ - return btohl(data+12); + long Key::getTime() { + return btohl(data + 12); } - void Key::setTime(long newTime){ - htobl(data+12, newTime); + void Key::setTime(long newTime) { + htobl(data + 12, newTime); } - char* Key::getData(){ + char * Key::getData() { return data; } - long Fragment::getDuration(){ + void Key::toPrettyString(std::stringstream & str, int indent){ + str << std::string(indent, ' ') << "Key " << getNumber() << ": Pos(" << getBpos() << "), Dur(" << getLength() << "), Parts(" << getParts() << "), Time(" << getTime() << ")" << std::endl; + } + + long Fragment::getDuration() { return btohl(data); } - void Fragment::setDuration(long newDuration){ + void Fragment::setDuration(long newDuration) { htobl(data, newDuration); } - char Fragment::getLength(){ + char Fragment::getLength() { return data[4]; } - void Fragment::setLength(char newLength){ + void Fragment::setLength(char newLength) { data[4] = newLength; } - short Fragment::getNumber(){ - return btohs(data+5); + short Fragment::getNumber() { + return btohs(data + 5); } - void Fragment::setNumber(short newNumber){ - htobs(data+5, newNumber); + void Fragment::setNumber(short newNumber) { + htobs(data + 5, newNumber); } - long Fragment::getSize(){ - return btohl(data+7); + long Fragment::getSize() { + return btohl(data + 7); } - void Fragment::setSize(long newSize){ - htobl(data+7, newSize); + void Fragment::setSize(long newSize) { + htobl(data + 7, newSize); } - char* Fragment::getData(){ + char * Fragment::getData() { return data; } - readOnlyTrack::readOnlyTrack(){ + void Fragment::toPrettyString(std::stringstream & str, int indent){ + str << std::string(indent, ' ') << "Fragment " << getNumber() << ": Dur(" << getDuration() << "), Len(" << (int)getLength() << "), Size(" << getSize() << ")" << std::endl; + } + + readOnlyTrack::readOnlyTrack() { fragments = NULL; fragLen = 0; keys = NULL; @@ -153,27 +474,33 @@ namespace DTSC { firstms = 0; lastms = 0; bps = 0; + rate = 0; + size = 0; + channels = 0; + width = 0; + height = 0; + fpks = 0; } - readOnlyTrack::readOnlyTrack(JSON::Value & trackRef){ - if (trackRef.isMember("fragments") && trackRef["fragments"].isString()){ - fragments = (Fragment*)trackRef["fragments"].asStringRef().data(); + readOnlyTrack::readOnlyTrack(JSON::Value & trackRef) { + if (trackRef.isMember("fragments") && trackRef["fragments"].isString()) { + fragments = (Fragment *)trackRef["fragments"].asStringRef().data(); fragLen = trackRef["fragments"].asStringRef().size() / 11; - }else{ + } else { fragments = 0; fragLen = 0; } - if (trackRef.isMember("keys") && trackRef["keys"].isString()){ - keys = (Key*)trackRef["keys"].asStringRef().data(); + if (trackRef.isMember("keys") && trackRef["keys"].isString()) { + keys = (Key *)trackRef["keys"].asStringRef().data(); keyLen = trackRef["keys"].asStringRef().size() / 16; - }else{ + } else { keys = 0; keyLen = 0; } - if (trackRef.isMember("parts") && trackRef["parts"].isString()){ - parts = (Part*)trackRef["parts"].asStringRef().data(); + if (trackRef.isMember("parts") && trackRef["parts"].isString()) { + parts = (Part *)trackRef["parts"].asStringRef().data(); partLen = trackRef["parts"].asStringRef().size() / 9; - }else{ + } else { parts = 0; partLen = 0; } @@ -185,25 +512,37 @@ namespace DTSC { codec = trackRef["codec"].asStringRef(); type = trackRef["type"].asStringRef(); init = trackRef["init"].asStringRef(); - if (type == "audio"){ + if (type == "audio") { rate = trackRef["rate"].asInt(); size = trackRef["size"].asInt(); channels = trackRef["channels"].asInt(); } - if (type == "video"){ + if (type == "video") { width = trackRef["width"].asInt(); height = trackRef["height"].asInt(); fpks = trackRef["fpks"].asInt(); } - if (codec == "vorbis" || codec == "theora"){ + if (codec == "vorbis" || codec == "theora") { idHeader = trackRef["idheader"].asStringRef(); commentHeader = trackRef["commentheader"].asStringRef(); } } - Track::Track(){} + Track::Track() { + trackID = 0; + firstms = 0; + lastms = 0; + bps = 0; + missedFrags = 0; + rate = 0; + size = 0; + channels = 0; + width = 0; + height = 0; + fpks = 0; + } - Track::Track(const readOnlyTrack & rhs){ + Track::Track(const readOnlyTrack & rhs) { trackID = rhs.trackID; firstms = rhs.firstms; lastms = rhs.lastms; @@ -220,29 +559,29 @@ namespace DTSC { fpks = rhs.fpks; idHeader = rhs.idHeader; commentHeader = rhs.commentHeader; - if (rhs.fragments && rhs.fragLen){ + if (rhs.fragments && rhs.fragLen) { fragments = std::deque(rhs.fragments, rhs.fragments + rhs.fragLen); } - if (rhs.keys && rhs.keyLen){ + if (rhs.keys && rhs.keyLen) { keys = std::deque(rhs.keys, rhs.keys + rhs.keyLen); } - if (rhs.parts && rhs.partLen){ + if (rhs.parts && rhs.partLen) { parts = std::deque(rhs.parts, rhs.parts + rhs.partLen); } } - Track::Track(JSON::Value & trackRef){ - if (trackRef.isMember("fragments") && trackRef["fragments"].isString()){ - Fragment* tmp = (Fragment*)trackRef["fragments"].asStringRef().data(); + Track::Track(JSON::Value & trackRef) { + if (trackRef.isMember("fragments") && trackRef["fragments"].isString()) { + Fragment * tmp = (Fragment *)trackRef["fragments"].asStringRef().data(); fragments = std::deque(tmp, tmp + (trackRef["fragments"].asStringRef().size() / 11)); } - if (trackRef.isMember("keys") && trackRef["keys"].isString()){ - Key* tmp = (Key*)trackRef["keys"].asStringRef().data(); + if (trackRef.isMember("keys") && trackRef["keys"].isString()) { + Key * tmp = (Key *)trackRef["keys"].asStringRef().data(); keys = std::deque(tmp, tmp + (trackRef["keys"].asStringRef().size() / 16)); } - if (trackRef.isMember("parts") && trackRef["parts"].isString()){ - Part* tmp = (Part*)trackRef["parts"].asStringRef().data(); - parts = std::deque(tmp,tmp + (trackRef["parts"].asStringRef().size() / 9)); + if (trackRef.isMember("parts") && trackRef["parts"].isString()) { + Part * tmp = (Part *)trackRef["parts"].asStringRef().data(); + parts = std::deque(tmp, tmp + (trackRef["parts"].asStringRef().size() / 9)); } trackID = trackRef["trackid"].asInt(); firstms = trackRef["firstms"].asInt(); @@ -252,72 +591,133 @@ namespace DTSC { codec = trackRef["codec"].asStringRef(); type = trackRef["type"].asStringRef(); init = trackRef["init"].asStringRef(); - if (type == "audio"){ + if (type == "audio") { rate = trackRef["rate"].asInt(); size = trackRef["size"].asInt(); channels = trackRef["channels"].asInt(); } - if (type == "video"){ + if (type == "video") { width = trackRef["width"].asInt(); height = trackRef["height"].asInt(); fpks = trackRef["fpks"].asInt(); } - if (codec == "vorbis" || codec == "theora"){ + if (codec == "vorbis" || codec == "theora") { idHeader = trackRef["idheader"].asStringRef(); commentHeader = trackRef["commentheader"].asStringRef(); } } - void Track::update(JSON::Value & pack){ - if (pack["time"].asInt() < lastms){ - DEBUG_MSG(DLVL_WARN, "Received packets for track %d in wrong order (%d < %d) - ignoring!", (int)trackID, (int)pack["time"].asInt(), (int)lastms); + void Track::update(DTSC::Packet & pack) { + if (pack.getTime() < lastms) { + DEBUG_MSG(DLVL_WARN, "Received packets for track %d in wrong order (%d < %d) - ignoring!", (int)trackID, (int)pack.getTime(), (int)lastms); return; } Part newPart; - newPart.setSize(pack["data"].asStringRef().size()); - newPart.setOffset(pack["offset"].asInt()); - if (parts.size()){ - parts[parts.size()-1].setDuration(pack["time"].asInt() - lastms); - newPart.setDuration(pack["time"].asInt() - lastms); - }else{ + char * data; + int dataLen; + pack.getString("data", data, dataLen); + newPart.setSize(dataLen); + newPart.setOffset(pack.getInt("offset")); + if (parts.size()) { + parts[parts.size() - 1].setDuration(pack.getTime() - lastms); + newPart.setDuration(pack.getTime() - lastms); + } else { newPart.setDuration(0); } parts.push_back(newPart); - lastms = pack["time"].asInt(); - if (pack.isMember("keyframe") || !keys.size() || (type != "video" && pack["time"].asInt() - 5000 > keys[keys.size() - 1].getTime())){ + lastms = pack.getTime(); + if (pack.getFlag("keyframe") || !keys.size() || (type != "video" && pack.getTime() > 5000 && pack.getTime() - 5000 > keys[keys.size() - 1].getTime())) { Key newKey; - newKey.setTime(pack["time"].asInt()); + newKey.setTime(pack.getTime()); newKey.setParts(0); newKey.setLength(0); - if (keys.size()){ + if (keys.size()) { newKey.setNumber(keys[keys.size() - 1].getNumber() + 1); - keys[keys.size() - 1].setLength(pack["time"].asInt() - keys[keys.size() - 1].getTime()); - }else{ + keys[keys.size() - 1].setLength(pack.getTime() - keys[keys.size() - 1].getTime()); + } else { newKey.setNumber(1); } - if (pack.isMember("bpos")){//For VoD - newKey.setBpos(pack["bpos"].asInt()); - }else{ + if (pack.hasMember("bpos")) { //For VoD + newKey.setBpos(pack.getInt("bpos")); + } else { newKey.setBpos(0); } keys.push_back(newKey); firstms = keys[0].getTime(); - if (!fragments.size() || pack["time"].asInt() - 5000 >= getKey(fragments.rbegin()->getNumber()).getTime()){ + if (!fragments.size() || pack.getTime() - 5000 >= getKey(fragments.rbegin()->getNumber()).getTime()) { //new fragment Fragment newFrag; newFrag.setDuration(0); newFrag.setLength(1); newFrag.setNumber(keys[keys.size() - 1].getNumber()); - if (fragments.size()){ - fragments[fragments.size() - 1].setDuration(pack["time"].asInt() - getKey(fragments[fragments.size() - 1].getNumber()).getTime()); - if ( !bps && fragments[fragments.size() - 1].getDuration() > 1000){ + if (fragments.size()) { + fragments[fragments.size() - 1].setDuration(pack.getTime() - getKey(fragments[fragments.size() - 1].getNumber()).getTime()); + if (!bps && fragments[fragments.size() - 1].getDuration() > 1000) { bps = (fragments[fragments.size() - 1].getSize() * 1000) / fragments[fragments.size() - 1].getDuration(); } } newFrag.setDuration(0); newFrag.setSize(0); fragments.push_back(newFrag); - }else{ + } else { + Fragment & lastFrag = fragments[fragments.size() - 1]; + lastFrag.setLength(lastFrag.getLength() + 1); + } + } + keys.rbegin()->setParts(keys.rbegin()->getParts() + 1); + fragments.rbegin()->setSize(fragments.rbegin()->getSize() + dataLen); + } + + void Track::update(JSON::Value & pack) { + if (pack["time"].asInt() < lastms) { + DEBUG_MSG(DLVL_WARN, "Received packets for track %d in wrong order (%d < %d) - ignoring!", (int)trackID, (int)pack["time"].asInt(), (int)lastms); + return; + } + Part newPart; + newPart.setSize(pack["data"].asStringRef().size()); + newPart.setOffset(pack["offset"].asInt()); + if (parts.size()) { + parts[parts.size() - 1].setDuration(pack["time"].asInt() - lastms); + newPart.setDuration(pack["time"].asInt() - lastms); + } else { + newPart.setDuration(0); + } + parts.push_back(newPart); + lastms = pack["time"].asInt(); + if (pack.isMember("keyframe") || !keys.size() || (type != "video" && pack["time"].asInt() - 5000 > keys[keys.size() - 1].getTime())) { + Key newKey; + newKey.setTime(pack["time"].asInt()); + newKey.setParts(0); + newKey.setLength(0); + if (keys.size()) { + newKey.setNumber(keys[keys.size() - 1].getNumber() + 1); + keys[keys.size() - 1].setLength(pack["time"].asInt() - keys[keys.size() - 1].getTime()); + } else { + newKey.setNumber(1); + } + if (pack.isMember("bpos")) { //For VoD + newKey.setBpos(pack["bpos"].asInt()); + } else { + newKey.setBpos(0); + } + keys.push_back(newKey); + firstms = keys[0].getTime(); + if (!fragments.size() || pack["time"].asInt() - 5000 >= getKey(fragments.rbegin()->getNumber()).getTime()) { + //new fragment + Fragment newFrag; + newFrag.setDuration(0); + newFrag.setLength(1); + newFrag.setNumber(keys[keys.size() - 1].getNumber()); + if (fragments.size()) { + fragments[fragments.size() - 1].setDuration(pack["time"].asInt() - getKey(fragments[fragments.size() - 1].getNumber()).getTime()); + if (!bps && fragments[fragments.size() - 1].getDuration() > 1000) { + bps = (fragments[fragments.size() - 1].getSize() * 1000) / fragments[fragments.size() - 1].getDuration(); + } + } + newFrag.setDuration(0); + newFrag.setSize(0); + fragments.push_back(newFrag); + } else { Fragment & lastFrag = fragments[fragments.size() - 1]; lastFrag.setLength(lastFrag.getLength() + 1); } @@ -326,42 +726,42 @@ namespace DTSC { fragments.rbegin()->setSize(fragments.rbegin()->getSize() + pack["data"].asStringRef().size()); } - Key & Track::getKey(unsigned int keyNum){ + Key & Track::getKey(unsigned int keyNum) { static Key empty; - if (keyNum < keys[0].getNumber()){ + if (keyNum < keys[0].getNumber()) { return empty; } - if ((keyNum - keys[0].getNumber()) > keys.size()){ + if ((keyNum - keys[0].getNumber()) > keys.size()) { return empty; } return keys[keyNum - keys[0].getNumber()]; } - std::string readOnlyTrack::getIdentifier(){ + std::string readOnlyTrack::getIdentifier() { std::stringstream result; - if (type == ""){ + if (type == "") { result << "metadata_" << trackID; return result.str(); } result << type << "_"; result << codec << "_"; - if (type == "audio"){ + if (type == "audio") { result << channels << "ch_"; result << rate << "hz"; - }else if (type == "video"){ + } else if (type == "video") { result << width << "x" << height << "_"; result << (double)fpks / 1000 << "fps"; } return result.str(); } - std::string readOnlyTrack::getWritableIdentifier(){ + std::string readOnlyTrack::getWritableIdentifier() { std::stringstream result; result << getIdentifier() << "_" << trackID; - return result.str( ); + return result.str(); } - void Track::reset(){ + void Track::reset() { fragments.clear(); parts.clear(); keys.clear(); @@ -370,7 +770,7 @@ namespace DTSC { lastms = 0; } - readOnlyMeta::readOnlyMeta(){ + readOnlyMeta::readOnlyMeta() { vod = false; live = false; merged = false; @@ -379,27 +779,72 @@ namespace DTSC { bufferWindow = 0; } - readOnlyMeta::readOnlyMeta(JSON::Value & meta){ + readOnlyMeta::readOnlyMeta(JSON::Value & meta) { vod = meta.isMember("vod") && meta["vod"]; live = meta.isMember("live") && meta["live"]; merged = meta.isMember("merged") && meta["merged"]; bufferWindow = 0; - if (meta.isMember("buffer_window")){ + if (meta.isMember("buffer_window")) { bufferWindow = meta["buffer_window"].asInt(); } - for (JSON::ObjIter it = meta["tracks"].ObjBegin(); it != meta["tracks"].ObjEnd(); it++){ - if (it->second.isMember("trackid") && it->second["trackid"]){ + for (JSON::ObjIter it = meta["tracks"].ObjBegin(); it != meta["tracks"].ObjEnd(); it++) { + if (it->second.isMember("trackid") && it->second["trackid"]) { tracks[it->second["trackid"].asInt()] = readOnlyTrack(it->second); } } - if (meta.isMember("moreheader")){ + if (meta.isMember("moreheader")) { moreheader = meta["moreheader"].asInt(); - }else{ + } else { moreheader = 0; } } - Meta::Meta(){ + void readOnlyTrack::toPrettyString(std::stringstream & str, int indent, int verbosity){ + str << std::string(indent, ' ') << "Track " << getWritableIdentifier() << std::endl; + str << std::string(indent + 2, ' ') << "ID: " << trackID << std::endl; + str << std::string(indent + 2, ' ') << "Firstms: " << firstms << std::endl; + str << std::string(indent + 2, ' ') << "Lastms: " << lastms << std::endl; + str << std::string(indent + 2, ' ') << "Bps: " << bps << std::endl; + if (missedFrags){ + str << std::string(indent + 2, ' ') << "missedFrags: " << missedFrags << std::endl; + } + str << std::string(indent + 2, ' ') << "Codec: " << codec << std::endl; + str << std::string(indent + 2, ' ') << "Type: " << type << std::endl; + str << std::string(indent + 2, ' ') << "Init: " << init << std::endl; + if (type == "audio") { + str << std::string(indent + 2, ' ') << "Rate: " << rate << std::endl; + str << std::string(indent + 2, ' ') << "Size: " << size << std::endl; + str << std::string(indent + 2, ' ') << "Channel: " << channels << std::endl; + } else if (type == "video") { + str << std::string(indent + 2, ' ') << "Width: " << width << std::endl; + str << std::string(indent + 2, ' ') << "Height: " << height << std::endl; + str << std::string(indent + 2, ' ') << "Fpks: " << fpks << std::endl; + } + if (codec == "vorbis" || codec == "theora") { + str << std::string(indent + 2, ' ') << "IdHeader: " << idHeader << std::endl; + str << std::string(indent + 2, ' ') << "CommentHeader: " << commentHeader << std::endl; + } + str << std::string(indent + 2, ' ') << "Fragments: " << fragLen << std::endl; + if (fragments && verbosity & 0x01){ + for (unsigned int i = 0; i < fragLen; i++){ + fragments[i].toPrettyString(str, indent + 4); + } + } + str << std::string(indent + 2, ' ') << "Keys: " << keyLen << std::endl; + if (keys && verbosity & 0x02) { + for (unsigned int i = 0; i < keyLen; i++){ + keys[i].toPrettyString(str, indent + 4); + } + } + str << std::string(indent + 2, ' ') << "Parts: " << partLen << std::endl; + if (parts && verbosity & 0x04) { + for (unsigned int i = 0; i < partLen; i++){ + parts[i].toPrettyString(str, indent + 4); + } + } + } + + Meta::Meta() { vod = false; live = false; moreheader = 0; @@ -407,53 +852,106 @@ namespace DTSC { bufferWindow = 0; } - Meta::Meta(const readOnlyMeta & rhs){ + Meta::Meta(const readOnlyMeta & rhs) { vod = rhs.vod; live = rhs.live; merged = rhs.merged; bufferWindow = rhs.bufferWindow; - for (std::map::const_iterator it = rhs.tracks.begin(); it != rhs.tracks.end(); it++){ + for (std::map::const_iterator it = rhs.tracks.begin(); it != rhs.tracks.end(); it++) { tracks[it->first] = it->second; } moreheader = rhs.moreheader; } - Meta::Meta(JSON::Value & meta){ + Meta::Meta(JSON::Value & meta) { vod = meta.isMember("vod") && meta["vod"]; live = meta.isMember("live") && meta["live"]; merged = meta.isMember("merged") && meta["merged"]; bufferWindow = 0; - if (meta.isMember("buffer_window")){ + if (meta.isMember("buffer_window")) { bufferWindow = meta["buffer_window"].asInt(); } - for (JSON::ObjIter it = meta["tracks"].ObjBegin(); it != meta["tracks"].ObjEnd(); it++){ - if(it->second["trackid"].asInt()){ + for (JSON::ObjIter it = meta["tracks"].ObjBegin(); it != meta["tracks"].ObjEnd(); it++) { + if (it->second["trackid"].asInt()) { tracks[it->second["trackid"].asInt()] = Track(it->second); } } - if (meta.isMember("moreheader")){ + if (meta.isMember("moreheader")) { moreheader = meta["moreheader"].asInt(); - }else{ + } else { moreheader = 0; } } - void Meta::update(JSON::Value & pack){ + void Meta::update(JSON::Value & pack) { vod = pack.isMember("bpos"); live = !vod; - if (pack["trackid"].asInt() && tracks.count(pack["trackid"].asInt()) ){ + if (pack["trackid"].asInt() && tracks.count(pack["trackid"].asInt())) { tracks[pack["trackid"].asInt()].update(pack); } } - char * convertShort(short input){ + void Meta::update(DTSC::Packet & pack) { + vod = pack.hasMember("bpos"); + live = !vod; + if (pack.getTrackId() && tracks.count(pack.getTrackId())) { + tracks[pack.getTrackId()].update(pack); + } + } + + void Track::toPrettyString(std::stringstream & str, int indent, int verbosity){ + str << std::string(indent, ' ') << "Track " << getWritableIdentifier() << std::endl; + str << std::string(indent + 2, ' ') << "ID: " << trackID << std::endl; + str << std::string(indent + 2, ' ') << "Firstms: " << firstms << std::endl; + str << std::string(indent + 2, ' ') << "Lastms: " << lastms << std::endl; + str << std::string(indent + 2, ' ') << "Bps: " << bps << std::endl; + if (missedFrags){ + str << std::string(indent + 2, ' ') << "missedFrags: " << missedFrags << std::endl; + } + str << std::string(indent + 2, ' ') << "Codec: " << codec << std::endl; + str << std::string(indent + 2, ' ') << "Type: " << type << std::endl; + str << std::string(indent + 2, ' ') << "Init: " << init << std::endl; + if (type == "audio") { + str << std::string(indent + 2, ' ') << "Rate: " << rate << std::endl; + str << std::string(indent + 2, ' ') << "Size: " << size << std::endl; + str << std::string(indent + 2, ' ') << "Channel: " << channels << std::endl; + } else if (type == "video") { + str << std::string(indent + 2, ' ') << "Width: " << width << std::endl; + str << std::string(indent + 2, ' ') << "Height: " << height << std::endl; + str << std::string(indent + 2, ' ') << "Fpks: " << fpks << std::endl; + } + if (codec == "vorbis" || codec == "theora") { + str << std::string(indent + 2, ' ') << "IdHeader: " << idHeader << std::endl; + str << std::string(indent + 2, ' ') << "CommentHeader: " << commentHeader << std::endl; + } + str << std::string(indent + 2, ' ') << "Fragments: " << fragments.size() << std::endl; + if (verbosity & 0x01){ + for (unsigned int i = 0; i < fragments.size(); i++){ + fragments[i].toPrettyString(str, indent + 4); + } + } + str << std::string(indent + 2, ' ') << "Keys: " << keys.size() << std::endl; + if (verbosity & 0x02) { + for (unsigned int i = 0; i < keys.size(); i++){ + keys[i].toPrettyString(str, indent + 4); + } + } + str << std::string(indent + 2, ' ') << "Parts: " << parts.size() << std::endl; + if (verbosity & 0x04) { + for (unsigned int i = 0; i < parts.size(); i++){ + parts[i].toPrettyString(str, indent + 4); + } + } + } + + char * convertShort(short input) { static char result[2]; result[0] = (input >> 8) & 0xFF; result[1] = (input) & 0xFF; return result; } - char * convertInt(int input){ + char * convertInt(int input) { static char result[4]; result[0] = (input >> 24) & 0xFF; result[1] = (input >> 16) & 0xFF; @@ -462,7 +960,7 @@ namespace DTSC { return result; } - char * convertLongLong(long long int input){ + char * convertLongLong(long long int input) { static char result[8]; result[0] = (input >> 56) & 0xFF; result[1] = (input >> 48) & 0xFF; @@ -475,58 +973,132 @@ namespace DTSC { return result; } - int readOnlyTrack::getSendLen(){ + int readOnlyTrack::getSendLen() { int result = 146 + init.size() + codec.size() + type.size() + getWritableIdentifier().size(); result += fragLen * 11; result += keyLen * 16; result += partLen * 9; - if (type == "audio"){ + if (type == "audio") { result += 49; - }else if (type == "video"){ + } else if (type == "video") { result += 48; } - if (codec == "vorbis" || codec == "theora"){ + if (codec == "vorbis" || codec == "theora") { result += 15 + idHeader.size();//idheader result += 20 + commentHeader.size();//commentheader } - if (missedFrags){result += 23;} + if (missedFrags) { + result += 23; + } return result; } - int Track::getSendLen(){ + int Track::getSendLen() { int result = 146 + init.size() + codec.size() + type.size() + getWritableIdentifier().size(); result += fragments.size() * 11; result += keys.size() * 16; result += parts.size() * 9; - if (type == "audio"){ + if (type == "audio") { result += 49; - }else if (type == "video"){ + } else if (type == "video") { result += 48; } - if (codec == "vorbis" || codec == "theora"){ + if (codec == "vorbis" || codec == "theora") { result += 15 + idHeader.size();//idheader result += 20 + commentHeader.size();//commentheader } - if (missedFrags){result += 23;} + if (missedFrags) { + result += 23; + } return result; } + + static void writePointer(char *& p, const char * src, unsigned int len){ + memcpy(p, src, len); + p += len; + } - void readOnlyTrack::send(Socket::Connection & conn){ + static void writePointer(char *& p, const std::string & src){ + writePointer(p, src.data(), src.size()); + } + + void readOnlyTrack::writeTo(char *& p){ + std::string iden = getWritableIdentifier(); + writePointer(p, convertShort(iden.size()), 2); + writePointer(p, iden); + writePointer(p, "\340", 1);//Begin track object + writePointer(p, "\000\011fragments\002", 12); + writePointer(p, convertInt(fragLen * 11), 4); + writePointer(p, (char *)fragments, fragLen * 11); + writePointer(p, "\000\004keys\002", 7); + writePointer(p, convertInt(keyLen * 16), 4); + writePointer(p, (char *)keys, keyLen * 16); + writePointer(p, "\000\005parts\002", 8); + writePointer(p, convertInt(partLen * 9), 4); + writePointer(p, (char *)parts, partLen * 9); + writePointer(p, "\000\007trackid\001", 10); + writePointer(p, convertLongLong(trackID), 8); + if (missedFrags) { + writePointer(p, "\000\014missed_frags\001", 15); + writePointer(p, convertLongLong(missedFrags), 8); + } + writePointer(p, "\000\007firstms\001", 10); + writePointer(p, convertLongLong(firstms), 8); + writePointer(p, "\000\006lastms\001", 9); + writePointer(p, convertLongLong(lastms), 8); + writePointer(p, "\000\003bps\001", 6); + writePointer(p, convertLongLong(bps), 8); + writePointer(p, "\000\004init\002", 7); + writePointer(p, convertInt(init.size()), 4); + writePointer(p, init); + writePointer(p, "\000\005codec\002", 8); + writePointer(p, convertInt(codec.size()), 4); + writePointer(p, codec); + writePointer(p, "\000\004type\002", 7); + writePointer(p, convertInt(type.size()), 4); + writePointer(p, type); + if (type == "audio") { + writePointer(p, "\000\004rate\001", 7); + writePointer(p, convertLongLong(rate), 8); + writePointer(p, "\000\004size\001", 7); + writePointer(p, convertLongLong(size), 8); + writePointer(p, "\000\010channels\001", 11); + writePointer(p, convertLongLong(channels), 8); + } else if (type == "video") { + writePointer(p, "\000\005width\001", 8); + writePointer(p, convertLongLong(width), 8); + writePointer(p, "\000\006height\001", 9); + writePointer(p, convertLongLong(height), 8); + writePointer(p, "\000\004fpks\001", 7); + writePointer(p, convertLongLong(fpks), 8); + } + if (codec == "vorbis" || codec == "theora") { + writePointer(p, "\000\010idheader\002", 11); + writePointer(p, convertInt(idHeader.size()), 4); + writePointer(p, idHeader); + writePointer(p, "\000\015commentheader\002", 16); + writePointer(p, convertInt(commentHeader.size()), 4); + writePointer(p, commentHeader); + } + writePointer(p, "\000\000\356", 3);//End this track Object + } + + void readOnlyTrack::send(Socket::Connection & conn) { conn.SendNow(convertShort(getWritableIdentifier().size()), 2); conn.SendNow(getWritableIdentifier()); conn.SendNow("\340", 1);//Begin track object conn.SendNow("\000\011fragments\002", 12); - conn.SendNow(convertInt(fragLen*11), 4); - conn.SendNow((char*)fragments, fragLen*11); + conn.SendNow(convertInt(fragLen * 11), 4); + conn.SendNow((char *)fragments, fragLen * 11); conn.SendNow("\000\004keys\002", 7); - conn.SendNow(convertInt(keyLen*16), 4); - conn.SendNow((char*)keys, keyLen*16); + conn.SendNow(convertInt(keyLen * 16), 4); + conn.SendNow((char *)keys, keyLen * 16); conn.SendNow("\000\005parts\002", 8); - conn.SendNow(convertInt(partLen*9), 4); - conn.SendNow((char*)parts, partLen*9); + conn.SendNow(convertInt(partLen * 9), 4); + conn.SendNow((char *)parts, partLen * 9); conn.SendNow("\000\007trackid\001", 10); conn.SendNow(convertLongLong(trackID), 8); - if (missedFrags){ + if (missedFrags) { conn.SendNow("\000\014missed_frags\001", 15); conn.SendNow(convertLongLong(missedFrags), 8); } @@ -545,14 +1117,14 @@ namespace DTSC { conn.SendNow("\000\004type\002", 7); conn.SendNow(convertInt(type.size()), 4); conn.SendNow(type); - if (type == "audio"){ + if (type == "audio") { conn.SendNow("\000\004rate\001", 7); conn.SendNow(convertLongLong(rate), 8); conn.SendNow("\000\004size\001", 7); conn.SendNow(convertLongLong(size), 8); conn.SendNow("\000\010channels\001", 11); conn.SendNow(convertLongLong(channels), 8); - }else if (type == "video"){ + } else if (type == "video") { conn.SendNow("\000\005width\001", 8); conn.SendNow(convertLongLong(width), 8); conn.SendNow("\000\006height\001", 9); @@ -560,7 +1132,7 @@ namespace DTSC { conn.SendNow("\000\004fpks\001", 7); conn.SendNow(convertLongLong(fpks), 8); } - if (codec == "vorbis" || codec == "theora"){ + if (codec == "vorbis" || codec == "theora") { conn.SendNow("\000\010idheader\002", 11); conn.SendNow(convertInt(idHeader.size()), 4); conn.SendNow(idHeader); @@ -570,29 +1142,95 @@ namespace DTSC { } conn.SendNow("\000\000\356", 3);//End this track Object } + + void Track::writeTo(char *& p){ + writePointer(p, convertShort(getWritableIdentifier().size()), 2); + writePointer(p, getWritableIdentifier()); + writePointer(p, "\340", 1);//Begin track object + writePointer(p, "\000\011fragments\002", 12); + writePointer(p, convertInt(fragments.size() * 11), 4); + for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { + writePointer(p, it->getData(), 11); + } + writePointer(p, "\000\004keys\002", 7); + writePointer(p, convertInt(keys.size() * 16), 4); + for (std::deque::iterator it = keys.begin(); it != keys.end(); it++) { + writePointer(p, it->getData(), 16); + } + writePointer(p, "\000\005parts\002", 8); + writePointer(p, convertInt(parts.size() * 9), 4); + for (std::deque::iterator it = parts.begin(); it != parts.end(); it++) { + writePointer(p, it->getData(), 9); + } + writePointer(p, "\000\007trackid\001", 10); + writePointer(p, convertLongLong(trackID), 8); + if (missedFrags) { + writePointer(p, "\000\014missed_frags\001", 15); + writePointer(p, convertLongLong(missedFrags), 8); + } + writePointer(p, "\000\007firstms\001", 10); + writePointer(p, convertLongLong(firstms), 8); + writePointer(p, "\000\006lastms\001", 9); + writePointer(p, convertLongLong(lastms), 8); + writePointer(p, "\000\003bps\001", 6); + writePointer(p, convertLongLong(bps), 8); + writePointer(p, "\000\004init\002", 7); + writePointer(p, convertInt(init.size()), 4); + writePointer(p, init); + writePointer(p, "\000\005codec\002", 8); + writePointer(p, convertInt(codec.size()), 4); + writePointer(p, codec); + writePointer(p, "\000\004type\002", 7); + writePointer(p, convertInt(type.size()), 4); + writePointer(p, type); + if (type == "audio") { + writePointer(p, "\000\004rate\001", 7); + writePointer(p, convertLongLong(rate), 8); + writePointer(p, "\000\004size\001", 7); + writePointer(p, convertLongLong(size), 8); + writePointer(p, "\000\010channels\001", 11); + writePointer(p, convertLongLong(channels), 8); + } else if (type == "video") { + writePointer(p, "\000\005width\001", 8); + writePointer(p, convertLongLong(width), 8); + writePointer(p, "\000\006height\001", 9); + writePointer(p, convertLongLong(height), 8); + writePointer(p, "\000\004fpks\001", 7); + writePointer(p, convertLongLong(fpks), 8); + } + if (codec == "vorbis" || codec == "theora") { + writePointer(p, "\000\010idheader\002", 11); + writePointer(p, convertInt(idHeader.size()), 4); + writePointer(p, idHeader); + writePointer(p, "\000\015commentheader\002", 16); + writePointer(p, convertInt(commentHeader.size()), 4); + writePointer(p, commentHeader); + } + writePointer(p, "\000\000\356", 3);//End this track Object + } - void Track::send(Socket::Connection & conn){ + void Track::send(Socket::Connection & conn) { conn.SendNow(convertShort(getWritableIdentifier().size()), 2); conn.SendNow(getWritableIdentifier()); conn.SendNow("\340", 1);//Begin track object conn.SendNow("\000\011fragments\002", 12); - conn.SendNow(convertInt(fragments.size()*11), 4); - for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++){ + conn.SendNow(convertInt(fragments.size() * 11), 4); + for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { conn.SendNow(it->getData(), 11); } conn.SendNow("\000\004keys\002", 7); - conn.SendNow(convertInt(keys.size()*16), 4); - for (std::deque::iterator it = keys.begin(); it != keys.end(); it++){ + conn.SendNow(convertInt(keys.size() * 16), 4); + for (std::deque::iterator it = keys.begin(); it != keys.end(); it++) { conn.SendNow(it->getData(), 16); } conn.SendNow("\000\005parts\002", 8); - conn.SendNow(convertInt(parts.size()*9), 4); - for (std::deque::iterator it = parts.begin(); it != parts.end(); it++){ + conn.SendNow(convertInt(parts.size() * 9), 4); + for (std::deque::iterator it = parts.begin(); it != parts.end(); it++) { conn.SendNow(it->getData(), 9); } conn.SendNow("\000\007trackid\001", 10); conn.SendNow(convertLongLong(trackID), 8); - if (missedFrags){ + if (missedFrags) { conn.SendNow("\000\014missed_frags\001", 15); conn.SendNow(convertLongLong(missedFrags), 8); } @@ -611,14 +1249,14 @@ namespace DTSC { conn.SendNow("\000\004type\002", 7); conn.SendNow(convertInt(type.size()), 4); conn.SendNow(type); - if (type == "audio"){ + if (type == "audio") { conn.SendNow("\000\004rate\001", 7); conn.SendNow(convertLongLong(rate), 8); conn.SendNow("\000\004size\001", 7); conn.SendNow(convertLongLong(size), 8); conn.SendNow("\000\010channels\001", 11); conn.SendNow(convertLongLong(channels), 8); - }else if (type == "video"){ + } else if (type == "video") { conn.SendNow("\000\005width\001", 8); conn.SendNow(convertLongLong(width), 8); conn.SendNow("\000\006height\001", 9); @@ -626,7 +1264,7 @@ namespace DTSC { conn.SendNow("\000\004fpks\001", 7); conn.SendNow(convertLongLong(fpks), 8); } - if (codec == "vorbis" || codec == "theora"){ + if (codec == "vorbis" || codec == "theora") { conn.SendNow("\000\010idheader\002", 11); conn.SendNow(convertInt(idHeader.size()), 4); conn.SendNow(idHeader); @@ -637,64 +1275,66 @@ namespace DTSC { conn.SendNow("\000\000\356", 3);//End this track Object } - void readOnlyMeta::send(Socket::Connection & conn){ - int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ + unsigned int readOnlyMeta::getSendLen(){ + unsigned int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { dataLen += it->second.getSendLen(); } + return dataLen; + } + + void readOnlyMeta::writeTo(char * p){ + int dataLen = getSendLen(); + writePointer(p, DTSC::Magic_Header, 4); + writePointer(p, convertInt(dataLen), 4); + writePointer(p, "\340\000\006tracks\340", 10); + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + it->second.writeTo(p); + } + writePointer(p, "\000\000\356", 3); + if (vod) { + writePointer(p, "\000\003vod\001", 6); + writePointer(p, convertLongLong(1), 8); + } + if (live) { + writePointer(p, "\000\004live\001", 7); + writePointer(p, convertLongLong(1), 8); + } + if (merged) { + writePointer(p, "\000\006merged\001", 9); + writePointer(p, convertLongLong(1), 8); + } + if (bufferWindow) { + writePointer(p, "\000\015buffer_window\001", 16); + writePointer(p, convertLongLong(bufferWindow), 8); + } + writePointer(p, "\000\012moreheader\001", 13); + writePointer(p, convertLongLong(moreheader), 8); + writePointer(p, "\000\000\356", 3);//End global object + } + + void readOnlyMeta::send(Socket::Connection & conn) { + int dataLen = getSendLen(); conn.SendNow(DTSC::Magic_Header, 4); conn.SendNow(convertInt(dataLen), 4); conn.SendNow("\340\000\006tracks\340", 10); - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { it->second.send(conn); } conn.SendNow("\000\000\356", 3); - if (vod){ + if (vod) { conn.SendNow("\000\003vod\001", 6); conn.SendNow(convertLongLong(1), 8); } - if (live){ + if (live) { conn.SendNow("\000\004live\001", 7); conn.SendNow(convertLongLong(1), 8); } - if (merged){ + if (merged) { conn.SendNow("\000\006merged\001", 9); conn.SendNow(convertLongLong(1), 8); } - if (bufferWindow){ - conn.SendNow("\000\015buffer_window\001", 16); - conn.SendNow(convertLongLong(bufferWindow), 8); - } - conn.SendNow("\000\012moreheader\001", 13); - conn.SendNow(convertLongLong(moreheader), 8); - conn.SendNow("\000\000\356", 3);//End global object - } - - void Meta::send(Socket::Connection & conn){ - int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ - dataLen += it->second.getSendLen(); - } - conn.SendNow(DTSC::Magic_Header, 4); - conn.SendNow(convertInt(dataLen), 4); - conn.SendNow("\340\000\006tracks\340", 10); - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ - it->second.send(conn); - } - conn.SendNow("\000\000\356", 3);//End tracks object - if (vod){ - conn.SendNow("\000\003vod\001", 6); - conn.SendNow(convertLongLong(1), 8); - } - if (live){ - conn.SendNow("\000\004live\001", 7); - conn.SendNow(convertLongLong(1), 8); - } - if (merged){ - conn.SendNow("\000\006merged\001", 9); - conn.SendNow(convertLongLong(1), 8); - } - if (bufferWindow){ + if (bufferWindow) { conn.SendNow("\000\015buffer_window\001", 16); conn.SendNow(convertLongLong(bufferWindow), 8); } @@ -703,60 +1343,128 @@ namespace DTSC { conn.SendNow("\000\000\356", 3);//End global object } - JSON::Value readOnlyTrack::toJSON(){ + unsigned int Meta::getSendLen(){ + unsigned int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + dataLen += it->second.getSendLen(); + } + return dataLen; + } + + void Meta::writeTo(char * p){ + int dataLen = getSendLen(); + writePointer(p, DTSC::Magic_Header, 4); + writePointer(p, convertInt(dataLen), 4); + writePointer(p, "\340\000\006tracks\340", 10); + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + it->second.writeTo(p); + } + writePointer(p, "\000\000\356", 3);//End tracks object + if (vod) { + writePointer(p, "\000\003vod\001", 6); + writePointer(p, convertLongLong(1), 8); + } + if (live) { + writePointer(p, "\000\004live\001", 7); + writePointer(p, convertLongLong(1), 8); + } + if (merged) { + writePointer(p, "\000\006merged\001", 9); + writePointer(p, convertLongLong(1), 8); + } + if (bufferWindow) { + writePointer(p, "\000\015buffer_window\001", 16); + writePointer(p, convertLongLong(bufferWindow), 8); + } + writePointer(p, "\000\012moreheader\001", 13); + writePointer(p, convertLongLong(moreheader), 8); + writePointer(p, "\000\000\356", 3);//End global object + } + + void Meta::send(Socket::Connection & conn) { + int dataLen = getSendLen(); + conn.SendNow(DTSC::Magic_Header, 4); + conn.SendNow(convertInt(dataLen), 4); + conn.SendNow("\340\000\006tracks\340", 10); + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + it->second.send(conn); + } + conn.SendNow("\000\000\356", 3);//End tracks object + if (vod) { + conn.SendNow("\000\003vod\001", 6); + conn.SendNow(convertLongLong(1), 8); + } + if (live) { + conn.SendNow("\000\004live\001", 7); + conn.SendNow(convertLongLong(1), 8); + } + if (merged) { + conn.SendNow("\000\006merged\001", 9); + conn.SendNow(convertLongLong(1), 8); + } + if (bufferWindow) { + conn.SendNow("\000\015buffer_window\001", 16); + conn.SendNow(convertLongLong(bufferWindow), 8); + } + conn.SendNow("\000\012moreheader\001", 13); + conn.SendNow(convertLongLong(moreheader), 8); + conn.SendNow("\000\000\356", 3);//End global object + } + + JSON::Value readOnlyTrack::toJSON() { JSON::Value result; - if (fragments){ - result["fragments"] = std::string((char*)fragments, fragLen * 11); + if (fragments) { + result["fragments"] = std::string((char *)fragments, fragLen * 11); } - if (keys){ - result["keys"] = std::string((char*)keys, keyLen * 16); + if (keys) { + result["keys"] = std::string((char *)keys, keyLen * 16); } - if (parts){ - result["parts"] = std::string((char*)parts, partLen * 9); + if (parts) { + result["parts"] = std::string((char *)parts, partLen * 9); } result["trackid"] = trackID; result["firstms"] = firstms; result["lastms"] = lastms; result["bps"] = bps; - if (missedFrags){ + if (missedFrags) { result["missed_frags"] = missedFrags; } result["codec"] = codec; result["type"] = type; result["init"] = init; - if (type == "audio"){ + if (type == "audio") { result["rate"] = rate; result["size"] = size; result["channels"] = channels; - }else if (type == "video"){ + } else if (type == "video") { result["width"] = width; result["height"] = height; result["fpks"] = fpks; } - if (codec == "vorbis" || codec == "theora"){ + if (codec == "vorbis" || codec == "theora") { result["idheader"] = idHeader; result["commentheader"] = commentHeader; } return result; } - - JSON::Value Track::toJSON(){ + + JSON::Value Track::toJSON() { JSON::Value result; std::string tmp; tmp.reserve(fragments.size() * 11); - for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++){ + for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { tmp.append(it->getData(), 11); } result["fragments"] = tmp; tmp = ""; tmp.reserve(keys.size() * 16); - for (std::deque::iterator it = keys.begin(); it != keys.end(); it++){ + for (std::deque::iterator it = keys.begin(); it != keys.end(); it++) { tmp.append(it->getData(), 16); } result["keys"] = tmp; tmp = ""; tmp.reserve(parts.size() * 9); - for (std::deque::iterator it = parts.begin(); it != parts.end(); it++){ + for (std::deque::iterator it = parts.begin(); it != parts.end(); it++) { tmp.append(it->getData(), 9); } result["parts"] = tmp; @@ -764,94 +1472,135 @@ namespace DTSC { result["firstms"] = firstms; result["lastms"] = lastms; result["bps"] = bps; - if (missedFrags){ + if (missedFrags) { result["missed_frags"] = missedFrags; } result["codec"] = codec; result["type"] = type; result["init"] = init; - if (type == "audio"){ + if (type == "audio") { result["rate"] = rate; result["size"] = size; result["channels"] = channels; - }else if (type == "video"){ + } else if (type == "video") { result["width"] = width; result["height"] = height; result["fpks"] = fpks; } - if (codec == "vorbis" || codec == "theora"){ + if (codec == "vorbis" || codec == "theora") { result["idheader"] = idHeader; result["commentheader"] = commentHeader; } return result; } - JSON::Value Meta::toJSON(){ + JSON::Value Meta::toJSON() { JSON::Value result; - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { result["tracks"][it->second.getWritableIdentifier()] = it->second.toJSON(); } - if (vod){ + if (vod) { result["vod"] = 1ll; } - if (live){ + if (live) { result["live"] = 1ll; } - if (merged){ + if (merged) { result["merged"] = 1ll; } - if (bufferWindow){ + if (bufferWindow) { result["buffer_window"] = bufferWindow; } result["moreheader"] = moreheader; return result; } - JSON::Value readOnlyMeta::toJSON(){ + void readOnlyMeta::toPrettyString(std::stringstream & str, int indent, int verbosity){ + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + it->second.toPrettyString(str, indent, verbosity); + } + if (vod) { + str << std::string(indent, ' ') << "Video on Demand" << std::endl; + } + if (live) { + str << std::string(indent, ' ') << "Live" << std::endl; + } + if (merged) { + str << std::string(indent, ' ') << "Merged file" << std::endl; + } + if (bufferWindow) { + str << std::string(indent, ' ') << "Buffer Window: " << bufferWindow << std::endl; + } + str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl; + } + + void Meta::toPrettyString(std::stringstream & str, int indent, int verbosity){ + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + it->second.toPrettyString(str, indent, verbosity); + } + if (vod) { + str << std::string(indent, ' ') << "Video on Demand" << std::endl; + } + if (live) { + str << std::string(indent, ' ') << "Live" << std::endl; + } + if (merged) { + str << std::string(indent, ' ') << "Merged file" << std::endl; + } + if (bufferWindow) { + str << std::string(indent, ' ') << "Buffer Window: " << bufferWindow << std::endl; + } + str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl; + } + + JSON::Value readOnlyMeta::toJSON() { JSON::Value result; - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { result["tracks"][it->second.getWritableIdentifier()] = it->second.toJSON(); } - if (vod){ + if (vod) { result["vod"] = 1ll; } - if (live){ + if (live) { result["live"] = 1ll; } - if (merged){ + if (merged) { result["merged"] = 1ll; } result["moreheader"] = moreheader; - if (bufferWindow){ + if (bufferWindow) { result["buffer_window"] = bufferWindow; } return result; } - void Meta::reset(){ - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ + void Meta::reset() { + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { it->second.reset(); } } - bool readOnlyMeta::isFixed(){ - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ - if ( !it->second.keyLen || !(it->second.keys[it->second.keyLen - 1].getBpos())){ + bool readOnlyMeta::isFixed() { + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + if (!it->second.keyLen || !(it->second.keys[it->second.keyLen - 1].getBpos())) { return false; } } return true; } - bool Meta::isFixed(){ - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ - if (it->second.type == "meta" || it->second.type == ""){ + bool Meta::isFixed() { + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + if (it->second.type == "meta" || it->second.type == "") { continue; } - if (!it->second.keys.size() || !(it->second.keys.rbegin()->getBpos())){ + if (!it->second.keys.size() || !(it->second.keys.rbegin()->getBpos())) { return false; } } return true; } } + + + diff --git a/lib/flv_tag.cpp b/lib/flv_tag.cpp index 2ef904c9..a585660f 100644 --- a/lib/flv_tag.cpp +++ b/lib/flv_tag.cpp @@ -366,132 +366,111 @@ FLV::Tag & FLV::Tag::operator=(const FLV::Tag& O){ /// FLV loader function from DTSC. /// Takes the DTSC data and makes it into FLV. bool FLV::Tag::DTSCLoader(DTSC::Stream & S){ + std::string tmp = S.getPacket().toNetPacked(); + DTSC::Packet tmpPack(tmp.data(), tmp.size()); + return DTSCLoader(tmpPack, S.metadata.tracks[S.getPacket()["trackid"].asInt()]); +} + +bool FLV::Tag::DTSCLoader(DTSC::Packet & packData, DTSC::Track & track){ std::string meta_str; - DTSC::Track & track = S.metadata.tracks[S.getPacket()["trackid"].asInt()]; - switch (S.lastType()){ - case DTSC::VIDEO: - len = S.lastData().length() + 16; - if (track.codec == "H264"){ - len += 4; - } - break; - case DTSC::AUDIO: - len = S.lastData().length() + 16; - if (track.codec == "AAC"){ - len += 1; - } - break; - case DTSC::META:{ - AMF::Object amfdata("root", AMF::AMF0_DDV_CONTAINER); - amfdata.addContent(AMF::Object("", "onMetaData")); - amfdata.addContent(AMF::Object("", AMF::AMF0_ECMA_ARRAY)); - for (JSON::ObjIter it = S.getPacket()["data"].ObjBegin(); it != S.getPacket()["data"].ObjEnd(); it++){ - if (it->second.asInt()){ - amfdata.getContentP(1)->addContent(AMF::Object(it->first, it->second.asInt(), AMF::AMF0_NUMBER)); - }else{ - amfdata.getContentP(1)->addContent(AMF::Object(it->first, it->second.asString(), AMF::AMF0_STRING)); - } - } - meta_str = amfdata.Pack(); - len = meta_str.length() + 15; - break; + len = 0; + if (track.type == "video"){ + char * tmpData = 0; + packData.getString("data", tmpData, len); + len += 16; + if (track.codec == "H264"){ + len += 4; } - default: //ignore all other types (there are currently no other types...) - return false; - break; - } - if (len > 0){ if ( !checkBufferSize()){ return false; } - switch (S.lastType()){ - case DTSC::VIDEO: - if ((unsigned int)len == S.lastData().length() + 16){ - memcpy(data + 12, S.lastData().c_str(), S.lastData().length()); - }else{ - memcpy(data + 16, S.lastData().c_str(), S.lastData().length()); - if (S.getPacket().isMember("nalu")){ - data[12] = 1; - }else{ - data[12] = 2; - } - offset(S.getPacket()["offset"].asInt()); - } - data[11] = 0; - if (track.codec == "H264"){ - data[11] += 7; - } - if (track.codec == "H263"){ - data[11] += 2; - } - if (S.getPacket().isMember("keyframe")){ - data[11] += 0x10; - } - if (S.getPacket().isMember("interframe")){ - data[11] += 0x20; - } - if (S.getPacket().isMember("disposableframe")){ - data[11] += 0x30; - } - break; - case DTSC::AUDIO: { - if ((unsigned int)len == S.lastData().length() + 16){ - memcpy(data + 12, S.lastData().c_str(), S.lastData().length()); - }else{ - memcpy(data + 13, S.lastData().c_str(), S.lastData().length()); - data[12] = 1; //raw AAC data, not sequence header - } - data[11] = 0; - if (track.codec == "AAC"){ - data[11] += 0xA0; - } - if (track.codec == "MP3"){ - data[11] += 0x20; - } - unsigned int datarate = track.rate; - if (datarate >= 44100){ - data[11] += 0x0C; - }else if (datarate >= 22050){ - data[11] += 0x08; - }else if (datarate >= 11025){ - data[11] += 0x04; - } - if (track.size == 16){ - data[11] += 0x02; - } - if (track.channels > 1){ - data[11] += 0x01; - } - break; + if (track.codec == "H264"){ + memcpy(data + 16, tmpData, len - 20); + if (packData.getFlag("nalu")){ + data[12] = 1; + }else{ + data[12] = 2; } - case DTSC::META: - memcpy(data + 11, meta_str.c_str(), meta_str.length()); - break; - default: - break; + if (packData.getInt("offset") < 0){ + offset(0); + }else{ + offset(packData.getInt("offset")); + } + }else{ + memcpy(data + 12, tmpData, len - 16); + } + data[11] = 0; + if (track.codec == "H264"){ + data[11] += 7; + } + if (track.codec == "H263"){ + data[11] += 2; + } + if (packData.getFlag("keyframe")){ + data[11] += 0x10; + } + if (packData.getFlag("interframe")){ + data[11] += 0x20; + } + if (packData.getFlag("disposableframe")){ + data[11] += 0x30; } } - setLen(); - switch (S.lastType()){ - case DTSC::VIDEO: - data[0] = 0x09; - break; - case DTSC::AUDIO: - data[0] = 0x08; - break; - case DTSC::META: - data[0] = 0x12; - break; - default: - break; + if (track.type == "audio"){ + char * tmpData = 0; + packData.getString("data", tmpData, len); + len += 16; + if (track.codec == "AAC"){ + len ++; + } + if ( !checkBufferSize()){ + return false; + } + if (track.codec == "AAC"){ + memcpy(data + 13, tmpData, len - 17); + data[12] = 1; //raw AAC data, not sequence header + }else{ + memcpy(data + 12, tmpData, len - 16); + } + data[11] = 0; + if (track.codec == "AAC"){ + data[11] += 0xA0; + } + if (track.codec == "MP3"){ + data[11] += 0x20; + } + unsigned int datarate = track.rate; + if (datarate >= 44100){ + data[11] += 0x0C; + }else if (datarate >= 22050){ + data[11] += 0x08; + }else if (datarate >= 11025){ + data[11] += 0x04; + } + if (track.size == 16){ + data[11] += 0x02; + } + if (track.channels > 1){ + data[11] += 0x01; + } } + if (!len){ + return false; + } + setLen(); + if (track.type == "video"){data[0] = 0x09;} + if (track.type == "audio"){data[0] = 0x08;} + if (track.type == "meta"){data[0] = 0x12;} data[1] = ((len - 15) >> 16) & 0xFF; data[2] = ((len - 15) >> 8) & 0xFF; data[3] = (len - 15) & 0xFF; data[8] = 0; data[9] = 0; data[10] = 0; - tagTime(S.getPacket()["time"].asInt()); + tagTime(packData.getTime()); + if (packData.getInt("offset")){ + tagTime(packData.getTime() + packData.getInt("offset")); + } return true; } @@ -586,6 +565,86 @@ bool FLV::Tag::DTSCAudioInit(DTSC::Track & audio){ return true; } +bool FLV::Tag::DTSCMetaInit(DTSC::Meta & M, std::set & selTracks){ + AMF::Object amfdata("root", AMF::AMF0_DDV_CONTAINER); + amfdata.addContent(AMF::Object("", "onMetaData")); + amfdata.addContent(AMF::Object("", AMF::AMF0_ECMA_ARRAY)); + AMF::Object trinfo = AMF::Object("trackinfo", AMF::AMF0_STRICT_ARRAY); + int i = 0; + int mediaLen = 0; + for (std::set::iterator it = selTracks.begin(); it != selTracks.end(); it++){ + if (M.tracks[*it].lastms - M.tracks[*it].firstms > mediaLen){ + mediaLen = M.tracks[*it].lastms - M.tracks[*it].firstms; + } + if (M.tracks[*it].type == "video"){ + trinfo.addContent(AMF::Object("", AMF::AMF0_OBJECT)); + trinfo.getContentP(i)->addContent(AMF::Object("length", ((double)M.tracks[*it].lastms / 1000) * ((double)M.tracks[*it].fpks / 1000.0), AMF::AMF0_NUMBER)); + trinfo.getContentP(i)->addContent(AMF::Object("timescale", ((double)M.tracks[*it].fpks / 1000.0), AMF::AMF0_NUMBER)); + trinfo.getContentP(i)->addContent(AMF::Object("sampledescription", AMF::AMF0_STRICT_ARRAY)); + amfdata.getContentP(1)->addContent(AMF::Object("hasVideo", 1, AMF::AMF0_BOOL)); + if (M.tracks[*it].codec == "H264"){ + amfdata.getContentP(1)->addContent(AMF::Object("videocodecid", (std::string)"avc1")); + trinfo.getContentP(i)->getContentP(2)->addContent(AMF::Object("sampletype", (std::string)"avc1")); + } + if (M.tracks[*it].codec == "VP6"){ + amfdata.getContentP(1)->addContent(AMF::Object("videocodecid", 4, AMF::AMF0_NUMBER)); + trinfo.getContentP(i)->getContentP(2)->addContent(AMF::Object("sampletype", (std::string)"vp6")); + } + if (M.tracks[*it].codec == "H263"){ + amfdata.getContentP(1)->addContent(AMF::Object("videocodecid", 2, AMF::AMF0_NUMBER)); + trinfo.getContentP(i)->getContentP(2)->addContent(AMF::Object("sampletype", (std::string)"h263")); + } + amfdata.getContentP(1)->addContent(AMF::Object("width", M.tracks[*it].width, AMF::AMF0_NUMBER)); + amfdata.getContentP(1)->addContent(AMF::Object("height", M.tracks[*it].height, AMF::AMF0_NUMBER)); + amfdata.getContentP(1)->addContent(AMF::Object("videoframerate", (double)M.tracks[*it].fpks / 1000.0, AMF::AMF0_NUMBER)); + amfdata.getContentP(1)->addContent(AMF::Object("videodatarate", (double)M.tracks[*it].bps * 128.0, AMF::AMF0_NUMBER)); + ++i; + } + if (M.tracks[*it].type == "audio"){ + trinfo.addContent(AMF::Object("", AMF::AMF0_OBJECT)); + trinfo.getContentP(i)->addContent(AMF::Object("length", ((double)M.tracks[*it].lastms) * ((double)M.tracks[*it].rate), AMF::AMF0_NUMBER)); + trinfo.getContentP(i)->addContent(AMF::Object("timescale", M.tracks[*it].rate, AMF::AMF0_NUMBER)); + trinfo.getContentP(i)->addContent(AMF::Object("sampledescription", AMF::AMF0_STRICT_ARRAY)); + amfdata.getContentP(1)->addContent(AMF::Object("hasAudio", 1, AMF::AMF0_BOOL)); + amfdata.getContentP(1)->addContent(AMF::Object("audiodelay", 0, AMF::AMF0_NUMBER)); + if (M.tracks[*it].codec == "AAC"){ + amfdata.getContentP(1)->addContent(AMF::Object("audiocodecid", (std::string)"mp4a")); + trinfo.getContentP(i)->getContentP(2)->addContent(AMF::Object("sampletype", (std::string)"mp4a")); + } + if (M.tracks[*it].codec == "MP3"){ + amfdata.getContentP(1)->addContent(AMF::Object("audiocodecid", (std::string)"mp3")); + trinfo.getContentP(i)->getContentP(2)->addContent(AMF::Object("sampletype", (std::string)"mp3")); + } + amfdata.getContentP(1)->addContent(AMF::Object("audiochannels", M.tracks[*it].channels, AMF::AMF0_NUMBER)); + amfdata.getContentP(1)->addContent(AMF::Object("audiosamplerate", M.tracks[*it].rate, AMF::AMF0_NUMBER)); + amfdata.getContentP(1)->addContent(AMF::Object("audiosamplesize", M.tracks[*it].size, AMF::AMF0_NUMBER)); + amfdata.getContentP(1)->addContent(AMF::Object("audiodatarate", (double)M.tracks[*it].bps * 128.0, AMF::AMF0_NUMBER)); + ++i; + } + } + if (M.vod){ + amfdata.getContentP(1)->addContent(AMF::Object("duration", mediaLen/1000, AMF::AMF0_NUMBER)); + } + amfdata.getContentP(1)->addContent(trinfo); + + std::string tmp = amfdata.Pack(); + len = tmp.length() + 15; + if (len <= 0 || !checkBufferSize()){ + return false; + } + memcpy(data + 11, tmp.data(), len - 15); + setLen(); + data[0] = 0x12; + data[1] = ((len - 15) >> 16) & 0xFF; + data[2] = ((len - 15) >> 8) & 0xFF; + data[3] = (len - 15) & 0xFF; + data[8] = 0; + data[9] = 0; + data[10] = 0; + tagTime(0); + return true; +} + /// FLV metadata loader function from DTSC. /// Takes the DTSC metadata and makes it into FLV. /// Assumes metadata is available - so check before calling! @@ -691,7 +750,7 @@ bool FLV::Tag::DTSCMetaInit(DTSC::Stream & S, DTSC::Track & videoRef, DTSC::Trac if (len <= 0 || !checkBufferSize()){ return false; } - memcpy(data + 11, tmp.c_str(), len - 15); + memcpy(data + 11, tmp.data(), len - 15); setLen(); data[0] = 0x12; data[1] = ((len - 15) >> 16) & 0xFF; diff --git a/lib/flv_tag.h b/lib/flv_tag.h index 201b846f..f7dd6bc3 100644 --- a/lib/flv_tag.h +++ b/lib/flv_tag.h @@ -47,8 +47,10 @@ namespace FLV { //loader functions bool ChunkLoader(const RTMPStream::Chunk& O); bool DTSCLoader(DTSC::Stream & S); + bool DTSCLoader(DTSC::Packet & packData, DTSC::Track & track); bool DTSCVideoInit(DTSC::Track & video); bool DTSCAudioInit(DTSC::Track & audio); + bool DTSCMetaInit(DTSC::Meta & M, std::set & selTracks); bool DTSCMetaInit(DTSC::Stream & S, DTSC::Track & videoRef, DTSC::Track & audioRef); JSON::Value toJSON(DTSC::Meta & metadata); bool MemLoader(char * D, unsigned int S, unsigned int & P); diff --git a/lib/json.cpp b/lib/json.cpp index 3bd9ea6d..e44c7234 100644 --- a/lib/json.cpp +++ b/lib/json.cpp @@ -143,7 +143,9 @@ JSON::Value::Value(std::istream & fromstream){ null(); bool reading_object = false; bool reading_array = false; - while (fromstream.good()){ + bool negative = false; + bool stop = false; + while (!stop && fromstream.good()){ int c = fromstream.peek(); switch (c){ case '{': @@ -167,12 +169,16 @@ JSON::Value::Value(std::istream & fromstream){ if ( !reading_object){ myType = STRING; strVal = read_string(c, fromstream); - return; + stop = true; }else{ std::string tmpstr = read_string(c, fromstream); ( *this)[tmpstr] = JSON::Value(fromstream); } break; + case '-': + c = fromstream.get(); + negative = true; + break; case '0': case '1': case '2': @@ -189,7 +195,10 @@ JSON::Value::Value(std::istream & fromstream){ intVal += c - '0'; break; case ',': - if ( !reading_object && !reading_array) return; + if ( !reading_object && !reading_array){ + stop = true; + break; + } c = fromstream.get(); if (reading_array){ append(JSON::Value(fromstream)); @@ -199,33 +208,33 @@ JSON::Value::Value(std::istream & fromstream){ if (reading_object){ c = fromstream.get(); } - return; + stop = true; break; case ']': if (reading_array){ c = fromstream.get(); } - return; + stop = true; break; case 't': case 'T': skipToEnd(fromstream); myType = BOOL; intVal = 1; - return; + stop = true; break; case 'f': case 'F': skipToEnd(fromstream); myType = BOOL; intVal = 0; - return; + stop = true; break; case 'n': case 'N': skipToEnd(fromstream); myType = EMPTY; - return; + stop = true; break; default: c = fromstream.get(); //ignore this character @@ -233,6 +242,9 @@ JSON::Value::Value(std::istream & fromstream){ break; } } + if (negative){ + intVal *= -1; + } } /// Sets this JSON::Value to the given string. @@ -1119,10 +1131,10 @@ JSON::Value JSON::fromDTMI2(std::string & data){ void JSON::fromDTMI2(const unsigned char * data, unsigned int len, unsigned int &i, JSON::Value & ret){ if (len < 13){return;} - long long int tmpTrackID = ntohl(((int*)data)[0]); - long long int tmpTime = ntohl(((int*)data)[1]); + long long int tmpTrackID = ntohl(((int*)(data+i))[0]); + long long int tmpTime = ntohl(((int*)(data+i))[1]); tmpTime <<= 32; - tmpTime += ntohl(((int*)data)[2]); + tmpTime += ntohl(((int*)(data+i))[2]); i += 12; fromDTMI(data, len, i, ret); ret["time"] = tmpTime; diff --git a/lib/ogg.cpp b/lib/ogg.cpp index 0f973f2c..f10e5800 100644 --- a/lib/ogg.cpp +++ b/lib/ogg.cpp @@ -58,9 +58,10 @@ namespace OGG{ if (newData.size()<27){ return false; } - /*if (getMagicNumber() != 0x4f676753){ + if (newData.substr(0, 4) != "OggS"){ + DEBUG_MSG(DLVL_FAIL, "Invalid Ogg page encountered - cannot continue"); return false; - }*/ + } dataSum = 0; if (!checkDataSize(27)){ return false; @@ -81,20 +82,72 @@ namespace OGG{ } if (newData.size() < 27 + getPageSegments() + dataSum){//check input size + dataSum = 0; return false; } if(!checkDataSize(27 + getPageSegments()+dataSum)){ + dataSum = 0; return false; } memcpy(data + 27 + getPageSegments(), newData.c_str() + 27 + getPageSegments(), dataSum); newData.erase(0, getPageSize()); return true; } - - long unsigned int Page::getMagicNumber(){ - return ntohl(((long unsigned int*)(data))[0]); + + + bool Page::read(FILE * inFile){ + segmentTableDeque.clear(); + int oriPos = ftell(inFile); + dataSum = 0; + if (!checkDataSize(27)){ + DEBUG_MSG(DLVL_WARN,"Unable to read a page: memory allocation"); + return false; + } + if (!fread(data, 27, 1, inFile)){ + DEBUG_MSG(DLVL_WARN,"Unable to read a page: fread"); + fseek(inFile, oriPos, SEEK_SET); + return false; + } + if(!checkDataSize(27 + getPageSegments())){ + DEBUG_MSG(DLVL_WARN,"Unable to read a page: memory allocation1"); + return false; + } + if (!fread(data + 27, getPageSegments(), 1, inFile)){ + DEBUG_MSG(DLVL_WARN,"Unable to read a page: fread1"); + fseek(inFile, oriPos, SEEK_SET); + return false; + } + for (unsigned int i = 0; i < getPageSegments(); i++){ + dataSum += data[27 + i]; + } + if (!checkDataSize(27 + getPageSegments() + dataSum)){ + DEBUG_MSG(DLVL_WARN,"Unable to read a page: memory allocation2"); + dataSum = 0; + return false; + } + if ( !fread(data + 27 + getPageSegments(), dataSum, 1, inFile)){ + DEBUG_MSG(DLVL_WARN,"Unable to read a page: fread2"); + fseek(inFile, oriPos, SEEK_SET); + dataSum = 0; + return false; + } + return true; } + bool Page::getSegment(unsigned int index, char * ret, unsigned int & len){ + if (index > segmentTableDeque.size()){ + ret = NULL; + len = 0; + return false; + } + ret = getFullPayload(); + for (int i = 0; i < index; i++){ + ret += segmentTableDeque[i]; + } + len = segmentTableDeque[index]; + return true; + } + void Page::setMagicNumber(){ if(checkDataSize(4)){ memcpy(data, "OggS", 4); @@ -208,34 +261,26 @@ namespace OGG{ DEBUG_MSG(DLVL_ERROR, "Segment too big, create a continue page"); } - /// \todo MAKE FIX HERE bool Page::setSegmentTable(std::vector layout){ dataSum=0; for (unsigned int i = 0; i < layout.size(); i++){ dataSum += layout[i]; } unsigned int place = 0; - char table[255]; + char table[256]; for (unsigned int i = 0; i < layout.size(); i++){ - while (layout[i]>=255){ - if (place > 255){ - STerrMSG(); - return false; - } - table[place] = 255; - layout[i] -= 255; - place++; - } - if (place > 255){ + int amount = (layout[i]/255) + 1; + if (i == layout.size() - 1 && place + amount > (255 + (layout[i] % 255 == 0))){ STerrMSG(); return false; } - if (layout[i] >= 0){//fix somewhere here - table[place] = layout[i]; - if (place<255){//last segment does not need a closing 0 - place++; - } - } + memset(table + place, 255, amount - 1); + table[place + amount - 1] = layout[i] % 255; + place += amount; + } + //Don't send element 256, even if it was filled. + if (place > 255){ + place = 255; } setPageSegments(place); setSegmentTable(table,place); @@ -260,92 +305,42 @@ namespace OGG{ return data + 27 + getPageSegments(); } - bool Page::typeBOS(){ - if (getHeaderType() & 0x02){ - return true; - } - return false; - } - - bool Page::typeEOS(){ - if (getHeaderType() & 0x04){ - return true; - } - return false; - } - - bool Page::typeContinue(){ - if (getHeaderType() & 0x01){ - return true; - } - return false; - } - - bool Page::typeNone(){ - if ((getHeaderType() & 0x07) == 0x00){ - return true; - } - return false; - } - void Page::setInternalCodec(std::string myCodec){ codec = myCodec; } std::string Page::toPrettyString(size_t indent){ std::stringstream r; - r << std::string(indent,' ') << "OGG Page (" << getPageSize() << ")" << std::endl; - r << std::string(indent + 2,' ') << "Magic Number: " << std::string(data, 4) << std::endl; + r << std::string(indent,' ') << "Ogg page (" << getPageSize() << ")" << std::endl; r << std::string(indent + 2,' ') << "Version: " << (int)getVersion() << std::endl; - r << std::string(indent + 2,' ') << "Headertype: " << std::hex << (int)getHeaderType() << std::dec; - if (typeContinue()){ - r << " continued"; + r << std::string(indent + 2,' ') << "Header type:"; + if ( !getHeaderType()){ + r << " Normal"; + }else{ + if (getHeaderType() & Continued){ + r << " Continued"; + } + if (getHeaderType() & BeginOfStream){ + r << " BeginOfStream"; + } + if (getHeaderType() & EndOfStream){ + r << " EndOfStream"; + } } - if (typeBOS()){ - r << " bos"; - } - if (typeEOS()){ - r << " eos"; - } - r << std::endl; - r << std::string(indent + 2,' ') << "Granule Position: " << std::hex << getGranulePosition() << std::dec << std::endl; - r << std::string(indent + 2,' ') << "Bitstream Number: " << getBitstreamSerialNumber() << std::endl; - r << std::string(indent + 2,' ') << "Sequence Number: " << getPageSequenceNumber() << std::endl; + r << " (" << (int)getHeaderType() << ")" << std::endl; + r << std::string(indent + 2,' ') << "Granule position: " << getGranulePosition() << std::endl; + r << std::string(indent + 2,' ') << "Bitstream number: " << getBitstreamSerialNumber() << std::endl; + r << std::string(indent + 2,' ') << "Sequence number: " << getPageSequenceNumber() << std::endl; r << std::string(indent + 2,' ') << "Checksum: " << std::hex << getCRCChecksum() << std::dec << std::endl; //r << " Calced Checksum: " << std::hex << calcChecksum() << std::dec << std::endl; - //r << "CRC_checksum write: " << std::hex << getCRCChecksum()<< std::dec << std::endl; - r << std::string(indent + 2,' ') << "Segments: " << (int)getPageSegments() << std::endl; + r << std::string(indent + 2,' ') << "Payloadsize: " << dataSum << std::endl; + r << std::string(indent + 2,' ') << (int)getPageSegments() << " segments:" << std::endl; + r << std::string(indent + 3,' '); std::deque temp = getSegmentTableDeque(); for (std::deque::iterator i = temp.begin(); i != temp.end(); i++){ - r << std::string(indent + 4,' ') << (*i) << std::endl; - } - r << std::string(indent + 2,' ') << "Payloadsize: " << dataSum << std::endl; - if (codec == "theora"){ - int offset = 0; - for (unsigned int i = 0; i < getSegmentTableDeque().size(); i++){ - theora::header tmpHeader; - int len = getSegmentTableDeque()[i]; - if (tmpHeader.read(getFullPayload()+offset,len)){ - r << tmpHeader.toPrettyString(indent + 4); - } - theora::frame tmpFrame; - if (tmpFrame.read(getFullPayload()+offset,len)){ - r << tmpFrame.toPrettyString(indent + 4); - } - offset += len; - } - }else if(codec == "vorbis"){ - r << "Vorbis Data" << std::endl; - int offset = 0; - for (unsigned int i = 0; i < getSegmentTableDeque().size(); i++){ - vorbis::header tmpHeader; - int len = getSegmentTableDeque()[i]; - if (tmpHeader.read(getFullPayload()+offset,len)){ - r << tmpHeader.toPrettyString(indent + 4); - } - offset += len; - } + r << " " << (*i); } + r << std::endl; return r.str(); } diff --git a/lib/ogg.h b/lib/ogg.h index 4ac01d48..0fe58b5d 100644 --- a/lib/ogg.h +++ b/lib/ogg.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -8,12 +9,20 @@ #include "json.h" namespace OGG{ + + enum HeaderType{ + Continued = 1, + BeginOfStream = 2, + EndOfStream = 4 + }; + class Page{ public: Page(); ~Page(); bool read(std::string & newData); - long unsigned int getMagicNumber(); + bool read(FILE * inFile); + bool getSegment(unsigned int index, char * data, unsigned int & len); void setMagicNumber(); char getVersion(); void setVersion(char newVal = 0); @@ -37,10 +46,6 @@ namespace OGG{ unsigned long int getPageSize(); char* getFullPayload();//returns all segments in the page int getPayloadSize(); - bool typeBOS(); - bool typeEOS(); - bool typeContinue(); - bool typeNone(); std::string toPrettyString(size_t indent = 0); void setInternalCodec(std::string myCodec); long unsigned int calcChecksum(); diff --git a/lib/procs.cpp b/lib/procs.cpp index e2f86da2..ceb4dbf5 100644 --- a/lib/procs.cpp +++ b/lib/procs.cpp @@ -648,7 +648,7 @@ bool Util::Procs::isActive(std::string name){ std::map::iterator it; for (it = listcopy.begin(); it != listcopy.end(); it++){ if (( *it).second == name){ - if (kill(( *it).first, 0) == 0){ + if (childRunning(( *it).first)){ return true; }else{ plist.erase(( *it).first); diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp new file mode 100644 index 00000000..8f0dbad3 --- /dev/null +++ b/lib/shared_memory.cpp @@ -0,0 +1,608 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "defines.h" +#include "shared_memory.h" + +namespace IPC { + /// Stores a long value of val in network order to the pointer p. + static void htobl(char * p, long val) { + p[0] = (val >> 24) & 0xFF; + p[1] = (val >> 16) & 0xFF; + p[2] = (val >> 8) & 0xFF; + p[3] = val & 0xFF; + } + + static void htobll(char * p, long long val) { + p[0] = (val >> 56) & 0xFF; + p[1] = (val >> 48) & 0xFF; + p[2] = (val >> 40) & 0xFF; + p[3] = (val >> 32) & 0xFF; + p[4] = (val >> 24) & 0xFF; + p[5] = (val >> 16) & 0xFF; + p[6] = (val >> 8) & 0xFF; + p[7] = val & 0xFF; + } + + static void btohl(char * p, long & val) { + val = ((long)p[0] << 24) | ((long)p[1] << 16) | ((long)p[2] << 8) | p[3]; + } + + static void btohll(char * p, long long & val) { + val = ((long long)p[0] << 56) | ((long long)p[1] << 48) | ((long long)p[2] << 40) | ((long long)p[3] << 32) | ((long long)p[4] << 24) | ((long long)p[5] << 16) | ((long long)p[6] << 8) | p[7]; + } + + sharedPage::sharedPage(std::string name_, unsigned int len_, bool master_, bool autoBackoff) : handle(0), name(name_), len(len_), master(master_), mapped(NULL) { + handle = 0; + name = name_; + len = len_; + master = master_; + mapped = 0; + init(name_,len_,master_, autoBackoff); + } + sharedPage::sharedPage(const sharedPage & rhs){ + handle = 0; + name = ""; + len = 0; + master = false; + mapped = 0; + init(rhs.name, rhs.len, rhs.master); + } + sharedPage::operator bool() const { + return mapped != 0; + } + void sharedPage::operator =(sharedPage & rhs){ + init(rhs.name, rhs.len, rhs.master); + rhs.master = false;//Make sure the memory does not get unlinked + } + void sharedPage::init(std::string name_, unsigned int len_, bool master_, bool autoBackoff) { + if (mapped && len){ + munmap(mapped,len); + } + if(master){ + shm_unlink(name.c_str()); + } + if (handle > 0){ + close(handle); + } + handle = 0; + name = name_; + len = len_; + master = master_; + mapped = 0; + if (name.size()){ + handle = shm_open(name.c_str(), ( master ? O_CREAT | O_EXCL : 0 )| O_RDWR, ACCESSPERMS); + if (handle == -1) { + if (master){ + DEBUG_MSG(DLVL_HIGH, "Overwriting old page for %s", name.c_str()); + handle = shm_open(name.c_str(), O_CREAT | O_RDWR, ACCESSPERMS); + }else{ + int i = 0; + while (i < 10 && handle == -1 && autoBackoff){ + i++; + Util::sleep(1000); + handle = shm_open(name.c_str(), O_RDWR, ACCESSPERMS); + } + } + } + if (handle == -1) { + perror(std::string("shm_open for page " + name + " failed").c_str()); + return; + } + if (master){ + if (ftruncate(handle, 0) < 0) { + perror(std::string("ftruncate to zero for page " + name + " failed").c_str()); + return; + } + if (ftruncate(handle, len) < 0) { + perror(std::string("ftruncate to len for page " + name + " failed").c_str()); + return; + } + }else{ + struct stat buffStats; + int xRes = fstat(handle, &buffStats); + if (xRes < 0){ + return; + } + len = buffStats.st_size; + } + mapped = (char*)mmap(0, len, PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0); + if (mapped == MAP_FAILED){ + mapped = 0; + return; + } + } + } + sharedPage::~sharedPage(){ + if (mapped && len){ + munmap(mapped,len); + } + if(master){ + shm_unlink(name.c_str()); + } + if (handle > 0){ + close(handle); + } + } + + sharedFile::sharedFile(std::string name_, unsigned int len_, bool master_, bool autoBackoff) : handle(0), name(name_), len(len_), master(master_), mapped(NULL) { + handle = 0; + name = name_; + len = len_; + master = master_; + mapped = 0; + init(name_,len_,master_, autoBackoff); + } + sharedFile::sharedFile(const sharedPage & rhs){ + handle = 0; + name = ""; + len = 0; + master = false; + mapped = 0; + init(rhs.name, rhs.len, rhs.master); + } + sharedFile::operator bool() const { + return mapped != 0; + } + void sharedFile::operator =(sharedFile & rhs){ + init(rhs.name, rhs.len, rhs.master); + rhs.master = false;//Make sure the memory does not get unlinked + } + void sharedFile::init(std::string name_, unsigned int len_, bool master_, bool autoBackoff) { + if (mapped && len){ + munmap(mapped,len); + } + if(master){ + unlink(name.c_str()); + } + if (handle > 0){ + close(handle); + } + handle = 0; + name = name_; + len = len_; + master = master_; + mapped = 0; + if (name.size()){ + /// \todo Use ACCESSPERMS instead of 0600? + handle = open(name.c_str(), ( master ? O_CREAT | O_TRUNC | O_EXCL : 0 )| O_RDWR, (mode_t)0600); + if (handle == -1) { + if (master){ + DEBUG_MSG(DLVL_HIGH, "Overwriting old file for %s", name.c_str()); + handle = open(name.c_str(), O_CREAT | O_TRUNC | O_RDWR, (mode_t)0600); + }else{ + int i = 0; + while (i < 10 && handle == -1 && autoBackoff){ + i++; + Util::sleep(1000); + handle = open(name.c_str(), O_RDWR, (mode_t)0600); + } + } + } + if (handle == -1) { + perror(std::string("open for file " + name + " failed").c_str()); + return; + } + if (master){ + if (ftruncate(handle, len) < 0) { + perror(std::string("ftruncate to len for file " + name + " failed").c_str()); + return; + } + }else{ + struct stat buffStats; + int xRes = fstat(handle, &buffStats); + if (xRes < 0){ + return; + } + len = buffStats.st_size; + } + mapped = (char*)mmap(0, len, PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0); + if (mapped == MAP_FAILED){ + mapped = 0; + return; + } + } + } + sharedFile::~sharedFile(){ + if (mapped && len){ + munmap(mapped,len); + } + if(master){ + unlink(name.c_str()); + } + if (handle > 0){ + close(handle); + } + } + + statExchange::statExchange(char * _data) : data(_data) {} + + void statExchange::now(long long int time) { + htobll(data, time); + } + + long long int statExchange::now() { + long long int result; + btohll(data, result); + return result; + } + + void statExchange::time(long time) { + htobl(data + 8, time); + } + + long statExchange::time() { + long result; + btohl(data + 8, result); + return result; + } + + void statExchange::lastSecond(long time) { + htobl(data + 12, time); + } + + long statExchange::lastSecond() { + long result; + btohl(data + 12, result); + return result; + } + + void statExchange::down(long long int bytes) { + htobll(data + 16, bytes); + } + + long long int statExchange::down() { + long long int result; + btohll(data + 16, result); + return result; + } + + void statExchange::up(long long int bytes) { + htobll(data + 24, bytes); + } + + long long int statExchange::up() { + long long int result; + btohll(data + 24, result); + return result; + } + + void statExchange::host(std::string name) { + memcpy(data + 32, name.c_str(), std::min((int)name.size(), 16)); + } + + std::string statExchange::host() { + return std::string(data + 32, std::min((int)strlen(data + 32), 16)); + } + + void statExchange::streamName(std::string name) { + memcpy(data + 48, name.c_str(), std::min((int)name.size(), 20)); + } + + std::string statExchange::streamName() { + return std::string(data + 48, std::min((int)strlen(data + 48), 20)); + } + + void statExchange::connector(std::string name) { + memcpy(data + 68, name.c_str(), std::min((int)name.size(), 20)); + } + + std::string statExchange::connector() { + return std::string(data + 68, std::min((int)strlen(data + 68), 20)); + } + + + semGuard::semGuard(sem_t * semaphore) : mySemaphore(semaphore) { + sem_wait(mySemaphore); + } + + semGuard::~semGuard() { + sem_post(mySemaphore); + } + + sharedServer::sharedServer(){ + mySemaphore = 0; + payLen = 0; + hasCounter = false; + amount = 0; + } + + sharedServer::sharedServer(std::string name, int len, bool withCounter){ + sharedServer(); + init(name, len, withCounter); + } + + + void sharedServer::init(std::string name, int len, bool withCounter){ + amount = 0; + if (mySemaphore != SEM_FAILED) { + sem_close(mySemaphore); + } + if (baseName != ""){ + sem_unlink(std::string("/" + baseName).c_str()); + } + myPages.clear(); + baseName = name; + payLen = len; + hasCounter = withCounter; + mySemaphore = sem_open(std::string("/" + baseName).c_str(), O_CREAT | O_EXCL | O_RDWR, ACCESSPERMS, 1); + if (mySemaphore == SEM_FAILED) { + mySemaphore = sem_open(std::string("/" + baseName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + } + if (mySemaphore == SEM_FAILED) { + perror("Creating semaphore failed"); + return; + } + newPage(); + newPage(); + newPage(); + newPage(); + newPage(); + } + + sharedServer::~sharedServer() { + if (mySemaphore != SEM_FAILED) { + sem_close(mySemaphore); + } + sem_unlink(std::string("/" + baseName).c_str()); + } + + sharedServer::operator bool() const { + return myPages.size(); + } + + void sharedServer::newPage() { + semGuard tmpGuard(mySemaphore); + sharedPage tmp(std::string(baseName + (char)(myPages.size() + (int)'A')), (4096 << myPages.size()), true); + myPages.insert(tmp); + tmp.master = false; + DEBUG_MSG(DLVL_WARN, "Added a new page: %s", tmp.name.c_str()); + } + + void sharedServer::deletePage() { + if (myPages.size() == 1) { + DEBUG_MSG(DLVL_WARN, "Can't remove last page for %s", baseName.c_str()); + return; + } + semGuard tmpGuard(mySemaphore); + myPages.erase((*myPages.rbegin())); + } + + bool sharedServer::isInUse(unsigned int id){ + unsigned int i = 0; + for (std::set::iterator it = myPages.begin(); it != myPages.end(); it++) { + //return if we reached the end + if (!it->mapped || !it->len){ + return false; + } + //not on this page? skip to next. + if (it->len < (id - i)*payLen){ + i += it->len / payLen; + continue; + } + if (hasCounter){ + //counter? return true if it is non-zero. + return (it->mapped[(id - i)*payLen] != 0); + }else{ + //no counter - check the entire size for being all zeroes. + for (unsigned int j = 0; j < payLen; ++j){ + if (it->mapped[(id-i)*payLen+j]){ + return true; + } + } + return false; + } + } + //only happens if we run out of pages + return false; + } + + void sharedServer::parseEach(void (*callback)(char * data, size_t len, unsigned int id)) { + char * empty = 0; + if (!hasCounter) { + empty = (char *)malloc(payLen * sizeof(char)); + memset(empty, 0, payLen); + } + unsigned int id = 0; + for (std::set::iterator it = myPages.begin(); it != myPages.end(); it++) { + if (!it->mapped || !it->len){ + DEBUG_MSG(DLVL_FAIL, "Something went terribly wrong?"); + break; + } + unsigned int offset = 0; + while (offset + payLen + (hasCounter ? 1 : 0) <= it->len) { + if (hasCounter){ + if (it->mapped[offset] != 0) { + int counter = it->mapped[offset]; + //increase the count if needed + if (id >= amount){ + amount = id+1; + DEBUG_MSG(DLVL_DEVEL, "Shared memory %s is now at count %u", baseName.c_str(), amount); + } + callback(it->mapped + offset + 1, payLen, id); + switch (counter) { + case 127: + DEBUG_MSG(DLVL_HIGH, "Client %u requested disconnect", id); + break; + case 126: + DEBUG_MSG(DLVL_HIGH, "Client %u timed out", id); + break; + case 255: + DEBUG_MSG(DLVL_HIGH, "Client %u disconnected on request", id); + break; + case 254: + DEBUG_MSG(DLVL_HIGH, "Client %u disconnect timed out", id); + break; + default: + break; + } + if (counter == 127 || counter == 126 || counter == 255 || counter == 254) { + memset(it->mapped + offset + 1, 0, payLen); + it->mapped[offset] = 0; + } else { + it->mapped[offset] ++; + } + }else{ + //stop if we're past the amount counted and we're empty + if (id >= amount - 1){ + //bring the counter down if this was the last element + if (id == amount - 1){ + amount = id; + DEBUG_MSG(DLVL_DEVEL, "Shared memory %s is now at count %u", baseName.c_str(), amount); + } + //stop, we're guaranteed no more pages are full at this point + return; + } + } + }else{ + if (memcmp(empty, it->mapped + offset, payLen)) { + //increase the count if needed + if (id >= amount){ + amount = id+1; + DEBUG_MSG(DLVL_DEVEL, "Shared memory %s is now at count %u", baseName.c_str(), amount); + } + callback(it->mapped + offset, payLen, id); + }else{ + //stop if we're past the amount counted and we're empty + if (id >= amount - 1){ + //bring the counter down if this was the last element + if (id == amount - 1){ + amount = id; + DEBUG_MSG(DLVL_DEVEL, "Shared memory %s is now at count %u", baseName.c_str(), amount); + } + //stop, we're guaranteed no more pages are full at this point + if (empty){ + free(empty); + } + return; + } + } + } + offset += payLen + (hasCounter ? 1 : 0); + id ++; + } + } + if (empty){ + free(empty); + } + } + + sharedClient::sharedClient() { + hasCounter = 0; + payLen = 0; + offsetOnPage = 0; + } + + sharedClient::sharedClient(const sharedClient & rhs ) { + baseName = rhs.baseName; + payLen = rhs.payLen; + hasCounter = rhs.hasCounter; + mySemaphore = sem_open(std::string("/" + baseName).c_str(), O_RDWR); + if (mySemaphore == SEM_FAILED) { + perror("Creating semaphore failed"); + return; + } + semGuard tmpGuard(mySemaphore); + myPage.init(rhs.myPage.name,rhs.myPage.len,rhs.myPage.master); + offsetOnPage = rhs.offsetOnPage; + } + + void sharedClient::operator =(const sharedClient & rhs ) { + baseName = rhs.baseName; + payLen = rhs.payLen; + hasCounter = rhs.hasCounter; + mySemaphore = sem_open(std::string("/" + baseName).c_str(), O_RDWR); + if (mySemaphore == SEM_FAILED) { + perror("Creating semaphore failed"); + return; + } + semGuard tmpGuard(mySemaphore); + myPage.init(rhs.myPage.name,rhs.myPage.len,rhs.myPage.master); + offsetOnPage = rhs.offsetOnPage; + } + + sharedClient::sharedClient(std::string name, int len, bool withCounter) : baseName(name), payLen(len), offsetOnPage(-1), hasCounter(withCounter) { + mySemaphore = sem_open(std::string("/" + baseName).c_str(), O_RDWR); + if (mySemaphore == SEM_FAILED) { + perror("Creating semaphore failed"); + return; + } + semGuard tmpGuard(mySemaphore); + char * empty = 0; + if (!hasCounter) { + empty = (char *)malloc(payLen * sizeof(char)); + if (!empty){ + DEBUG_MSG(DLVL_FAIL, "Failed to allocate %u bytes for empty payload!", payLen); + return; + } + memset(empty, 0, payLen); + } + for (char i = 'A'; i <= 'Z'; i++) { + myPage.init(baseName + i, (4096 << (i - 'A'))); + int offset = 0; + while (offset + payLen + (hasCounter ? 1 : 0) <= myPage.len) { + if ((hasCounter && myPage.mapped[offset] == 0) || (!hasCounter && !memcmp(myPage.mapped + offset, empty, payLen))) { + offsetOnPage = offset; + if (hasCounter) { + myPage.mapped[offset] = 1; + } + break; + } + offset += payLen + (hasCounter ? 1 : 0); + } + if (offsetOnPage != -1) { + break; + } + } + free(empty); + } + + sharedClient::~sharedClient() { + if (hasCounter){ + finish(); + } + if (mySemaphore != SEM_FAILED) { + sem_close(mySemaphore); + } + } + + void sharedClient::write(char * data, int len) { + if (hasCounter) { + keepAlive(); + } + memcpy(myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0), data, std::min(len, payLen)); + } + + void sharedClient::finish() { + if (!hasCounter) { + DEBUG_MSG(DLVL_WARN, "Trying to time-out an element without counters"); + return; + } + if (myPage.mapped){ + myPage.mapped[offsetOnPage] = 127; + } + } + + void sharedClient::keepAlive() { + if (!hasCounter) { + DEBUG_MSG(DLVL_WARN, "Trying to keep-alive an element without counters"); + return; + } + if (myPage.mapped[offsetOnPage] < 128){ + myPage.mapped[offsetOnPage] = 1; + }else{ + DEBUG_MSG(DLVL_WARN, "Trying to keep-alive an element that needs to timeout, ignoring"); + } + } + + char * sharedClient::getData() { + if (!myPage.mapped){return 0;} + return (myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0)); + } +} diff --git a/lib/shared_memory.h b/lib/shared_memory.h new file mode 100644 index 00000000..3fe87896 --- /dev/null +++ b/lib/shared_memory.h @@ -0,0 +1,116 @@ +#pragma once +#include +#include +#include + +#include "timing.h" + +namespace IPC { + + class statExchange { + public: + statExchange(char * _data); + void now(long long int time); + long long int now(); + void time(long time); + long time(); + void lastSecond(long time); + long lastSecond(); + void down(long long int bytes); + long long int down(); + void up(long long int bytes); + long long int up(); + void host(std::string name); + std::string host(); + void streamName(std::string name); + std::string streamName(); + void connector(std::string name); + std::string connector(); + private: + char * data; + }; + + class semGuard { + public: + semGuard(sem_t * semaphore); + ~semGuard(); + private: + sem_t * mySemaphore; + }; + + class sharedPage{ + public: + sharedPage(std::string name_ = "", unsigned int len_ = 0, bool master_ = false, bool autoBackoff = true); + sharedPage(const sharedPage & rhs); + ~sharedPage(); + operator bool() const; + void init(std::string name_, unsigned int len_, bool master_ = false, bool autoBackoff = true); + void operator =(sharedPage & rhs); + bool operator < (const sharedPage & rhs) const { + return name < rhs.name; + } + int handle; + std::string name; + long long int len; + bool master; + char * mapped; + }; + + class sharedFile{ + public: + sharedFile(std::string name_ = "", unsigned int len_ = 0, bool master_ = false, bool autoBackoff = true); + sharedFile(const sharedPage & rhs); + ~sharedFile(); + operator bool() const; + void init(std::string name_, unsigned int len_, bool master_ = false, bool autoBackoff = true); + void operator =(sharedFile & rhs); + bool operator < (const sharedFile & rhs) const { + return name < rhs.name; + } + int handle; + std::string name; + long long int len; + bool master; + char * mapped; + }; + + class sharedServer{ + public: + sharedServer(); + sharedServer(std::string name, int len, bool withCounter = false); + void init(std::string name, int len, bool withCounter = false); + ~sharedServer(); + void parseEach(void (*callback)(char * data, size_t len, unsigned int id)); + operator bool() const; + unsigned int amount; + private: + bool isInUse(unsigned int id); + void newPage(); + void deletePage(); + std::string baseName; + unsigned int payLen; + std::set myPages; + sem_t * mySemaphore; + bool hasCounter; + }; + + class sharedClient{ + public: + sharedClient(); + sharedClient(const sharedClient & rhs); + sharedClient(std::string name, int len, bool withCounter = false); + void operator = (const sharedClient & rhs); + ~sharedClient(); + void write(char * data, int len); + void finish(); + void keepAlive(); + char * getData(); + private: + std::string baseName; + sharedPage myPage; + sem_t * mySemaphore; + int payLen; + int offsetOnPage; + bool hasCounter; + }; +} diff --git a/lib/socket.cpp b/lib/socket.cpp index 7e0cac1a..818d4b6d 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -153,6 +153,11 @@ std::string & Socket::Buffer::get(){ } } +/// Completely empties the buffer +void Socket::Buffer::clear(){ + data.clear(); +} + /// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection. /// \param sockNo Integer representing the socket to convert. Socket::Connection::Connection(int sockNo){ @@ -387,6 +392,11 @@ bool Socket::Connection::connected() const{ return (sock >= 0) || ((pipes[0] >= 0) && (pipes[1] >= 0)); } +/// Returns the time this socket has been connected. +unsigned int Socket::Connection::connTime(){ + return conntime; +} + /// Returns total amount of bytes sent. unsigned int Socket::Connection::dataUp(){ return up; @@ -647,6 +657,48 @@ Socket::Connection::operator bool() const{ return connected(); } +/// Returns true if the given address can be matched with the remote host. +/// Can no longer return true after any socket error have occurred. +bool Socket::Connection::isAddress(std::string addr){ + struct addrinfo *result, *rp, hints; + memset( &hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_ADDRCONFIG | AI_V4MAPPED; + hints.ai_protocol = 0; + hints.ai_canonname = NULL; + hints.ai_addr = NULL; + hints.ai_next = NULL; + int s = getaddrinfo(addr.c_str(), 0, &hints, &result); + DEBUG_MSG(DLVL_DEVEL, "Meh: %s", addr.c_str()); + if (s != 0){ + return false; + } + + char newaddr[INET_ADDRSTRLEN]; + newaddr[0] = 0; + for (rp = result; rp != NULL; rp = rp->ai_next){ + if (rp->ai_family == AF_INET && inet_ntop(rp->ai_family, &(((sockaddr_in*)rp->ai_addr)->sin_addr), newaddr, INET_ADDRSTRLEN)){ + DEBUG_MSG(DLVL_DEVEL, "Comparing: '%s' to '%s'", remotehost.c_str(), newaddr); + if (remotehost == newaddr){ + return true; + } + DEBUG_MSG(DLVL_DEVEL, "Comparing: '%s' to '::ffff:%s'", remotehost.c_str(), newaddr); + if (remotehost == std::string("::ffff:")+newaddr){ + return true; + } + } + if (rp->ai_family == AF_INET6 && inet_ntop(rp->ai_family, &(((sockaddr_in6*)rp->ai_addr)->sin6_addr), newaddr, INET_ADDRSTRLEN)){ + DEBUG_MSG(DLVL_DEVEL, "Comparing: '%s' to '%s'", remotehost.c_str(), newaddr); + if (remotehost == newaddr){ + return true; + } + } + } + freeaddrinfo(result); + return false; +} + /// Create a new base Server. The socket is never connected, and a placeholder for later connections. Socket::Server::Server(){ sock = -1; diff --git a/lib/socket.h b/lib/socket.h index fa34ed3d..7434467e 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -39,6 +39,7 @@ namespace Socket { bool available(unsigned int count); std::string remove(unsigned int count); std::string copy(unsigned int count); + void clear(); }; //Buffer @@ -76,6 +77,7 @@ namespace Socket { int getSocket(); ///< Returns internal socket number. std::string getError(); ///< Returns a string describing the last error that occured. bool connected() const; ///< Returns the connected-state for this socket. + bool isAddress(std::string addr); //buffered i/o methods bool spool(); ///< Updates the downbuffer and upbuffer internal variables. bool flush(); ///< Updates the downbuffer and upbuffer internal variables until upbuffer is empty. @@ -87,6 +89,7 @@ namespace Socket { void SendNow(const char * data); ///< Will not buffer anything but always send right away. Blocks. void SendNow(const char * data, size_t len); ///< Will not buffer anything but always send right away. Blocks. //stats related methods + unsigned int connTime();///< Returns the time this socket has been connected. unsigned int dataUp(); ///< Returns total amount of bytes sent. unsigned int dataDown(); ///< Returns total amount of bytes received. std::string getStats(std::string C); ///< Returns a std::string of stats, ended by a newline. diff --git a/lib/stream.cpp b/lib/stream.cpp index 01056959..0d473c1e 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -60,20 +60,56 @@ void Util::Stream::sanitizeName(std::string & streamname){ } Socket::Connection Util::Stream::getLive(std::string streamname){ - return Socket::Connection(getTmpFolder() + "stream_" + streamname); + JSON::Value ServConf = JSON::fromFile(getTmpFolder() + "streamlist"); + static unsigned long long counter = 0; + std::stringstream name; + name << "MistInBuffer " << (counter++); + std::string player_bin = Util::getMyPath() + "MistInBuffer"; + DEBUG_MSG(DLVL_WARN, "Starting %s -p -s %s", player_bin.c_str(), streamname.c_str()); + char* argv[15] = {(char*)player_bin.c_str(), (char*)"-p", (char*)"-s", (char*)streamname.c_str(), (char*)0}; + int argNum = 4; + if (ServConf["streams"][streamname].isMember("DVR")){ + std::string bufferTime = ServConf["streams"][streamname]["DVR"].asString(); + argv[argNum++] = (char*)"-b"; + argv[argNum++] = (char*)bufferTime.c_str(); + argv[argNum++] = (char*)0; + } + + int pid = fork(); + if (pid){ + execvp(argv[0], argv); + _exit(42); + }else if(pid == -1){ + perror("Could not start vod"); + } + return Socket::Connection(); } /// Starts a process for a VoD stream. Socket::Connection Util::Stream::getVod(std::string filename, std::string streamname){ static unsigned long long counter = 0; std::stringstream name; - name << "MistPlayer " << (counter++); - std::string player_bin = Util::getMyPath() + "MistPlayer"; - char* const argv[] = {(char*)player_bin.c_str(), (char*)filename.c_str(), (char*)"-s", (char*)streamname.c_str(), (char*)0}; - int fdin = -1, fdout = -1, fderr = fileno(stderr); - Util::Procs::StartPiped(name.str(), argv, &fdin, &fdout, &fderr); - // if StartPiped fails then fdin and fdout will be unmodified (-1) - return Socket::Connection(fdin, fdout); + name << "MistInDTSC " << (counter++); + std::string player_bin = Util::getMyPath() + "MistInDTSC"; + if (filename.substr(filename.size()-5) == ".ismv"){ + name.str("MistInISMV " + filename); + player_bin = Util::getMyPath() + "MistInISMV"; + } + if (filename.substr(filename.size()-4) == ".flv"){ + name.str("MistInFLV " + filename); + player_bin = Util::getMyPath() + "MistInFLV"; + } + DEBUG_MSG(DLVL_WARN, "Starting %s -p -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str()); + char* const argv[] = {(char*)player_bin.c_str(), (char*)"-p", (char*)"-s", (char*)streamname.c_str(), (char*)filename.c_str(), (char*)0}; + + int pid = fork(); + if (pid){ + execvp(argv[0], argv); + _exit(42); + }else if(pid == -1){ + perror("Could not start vod"); + } + return Socket::Connection(); } /// Probe for available streams. Currently first VoD, then Live. @@ -84,7 +120,7 @@ Socket::Connection Util::Stream::getStream(std::string streamname){ if (ServConf["streams"][streamname]["source"].asString()[0] == '/'){ return getVod(ServConf["streams"][streamname]["source"].asString(), streamname); }else{ - return Socket::Connection(getTmpFolder() + "stream_" + streamname); + return getLive(streamname); } } DEBUG_MSG(DLVL_ERROR, "Stream not found: %s", streamname.c_str()); diff --git a/lib/theora.cpp b/lib/theora.cpp index 152f318c..fd0bf6ea 100644 --- a/lib/theora.cpp +++ b/lib/theora.cpp @@ -53,6 +53,12 @@ namespace theora{ datasize = 0; } + header::header(char * newData, unsigned int length){ + data = NULL; + datasize = 0; + read(newData, length); + } + bool header::validateIdentificationHeader(){ if (datasize != 42){return false;} if (getHeaderType() != 0){return false;} diff --git a/lib/theora.h b/lib/theora.h index e3ab70b4..5468dbca 100644 --- a/lib/theora.h +++ b/lib/theora.h @@ -7,6 +7,7 @@ namespace theora{ class header{ public: header(); + header(char* newData, unsigned int length); bool read(char* newData, unsigned int length); int getHeaderType(); char getVMAJ(); diff --git a/lib/timing.cpp b/lib/timing.cpp index 582e4f01..27fbef4e 100644 --- a/lib/timing.cpp +++ b/lib/timing.cpp @@ -42,6 +42,20 @@ long long int Util::getMS(){ return ((long long int)t.tv_sec) * 1000 + t.tv_nsec / 1000000; } +/// Gets the current time in microseconds. +long long unsigned int Util::getMicros(){ + struct timespec t; + clock_gettime(CLOCK_REALTIME, &t); + return ((long long unsigned int)t.tv_sec) * 1000000 + t.tv_nsec / 1000; +} + +/// Gets the time difference in microseconds. +long long unsigned int Util::getMicros(long long unsigned int previous){ + struct timespec t; + clock_gettime(CLOCK_REALTIME, &t); + return ((long long unsigned int)t.tv_sec) * 1000000 + t.tv_nsec / 1000 - previous; +} + /// Gets the amount of seconds since 01/01/1970. long long int Util::epoch(){ return time(0); diff --git a/lib/timing.h b/lib/timing.h index f328bcd5..9cb0c837 100644 --- a/lib/timing.h +++ b/lib/timing.h @@ -6,5 +6,7 @@ namespace Util { void sleep(int ms); ///< Sleeps for the indicated amount of milliseconds or longer. long long int getMS(); ///< Gets the current time in milliseconds. + long long unsigned int getMicros();///> 16), (char)(CMN >> 8), (char)CMN); exit(1); } unsigned short codebook_dimensions = stream.get(16); diff --git a/lib/vorbis.h b/lib/vorbis.h index 139ba3d2..7b502276 100644 --- a/lib/vorbis.h +++ b/lib/vorbis.h @@ -20,6 +20,7 @@ namespace vorbis{ class header{ public: header(); + header(char* newData, unsigned int length); bool read(char* newData, unsigned int length); int getHeaderType(); long unsigned int getVorbisVersion();