/// \file dtsc.cpp /// Holds all code for DDVTECH Stream Container parsing/generation. #include "bitfields.h" #include "defines.h" #include "dtsc.h" #include "encode.h" #include "shared_memory.h" #include "util.h" #include "stream.h" #include //for htonl/ntohl #include #include #include #include namespace DTSC{ char Magic_Header[] = "DTSC"; char Magic_Packet[] = "DTPD"; char Magic_Packet2[] = "DTP2"; char Magic_Command[] = "DTCM"; /// If non-zero, this variable will override any live jitter value calculations with the set value uint64_t veryUglyJitterOverride = 0; /// The mask that the current process will use to check if a track is valid uint8_t trackValidMask = TRACK_VALID_ALL; /// The mask that will be set by the current process for new tracks uint8_t trackValidDefault = TRACK_VALID_ALL; /// Default constructor for packets - sets a null pointer and invalid packet. Packet::Packet(){ data = NULL; bufferLen = 0; dataLen = 0; master = false; version = DTSC_INVALID; prevNalSize = 0; } /// Copy constructor for packets, copies an existing packet with same noCopy flag as original. Packet::Packet(const Packet &rhs, size_t idx){ master = false; bufferLen = 0; data = NULL; if (rhs.data && rhs.dataLen){ reInit(rhs.data, rhs.dataLen); if (idx != INVALID_TRACK_ID){Bit::htobl(data + 8, idx);} }else{ null(); } } /// Data constructor for packets, either references or copies a packet from raw data. Packet::Packet(const char *data_, unsigned int len, bool noCopy){ master = false; bufferLen = 0; data = NULL; reInit(data_, len, noCopy); } /// This destructor clears frees the data pointer if the packet was not a reference. Packet::~Packet(){ if (master && data){free(data);} } /// Copier for packets, copies an existing packet with same noCopy flag as original. /// If going from copy to noCopy, this will free the data pointer first. void Packet::operator=(const Packet &rhs){ if (master && !rhs.master){null();} if (rhs && rhs.data && rhs.dataLen){ reInit(rhs.data, rhs.dataLen, !rhs.master); }else{ null(); } } /// Returns true if the packet is deemed valid, false otherwise. /// Valid packets have a length of at least 8, known header type, and length equal to the length /// set in the header. Packet::operator bool() const{ if (!data){ DONTEVEN_MSG("No data"); return false; } if (dataLen < 8){ VERYHIGH_MSG("Datalen < 8"); return false; } if (version == DTSC_INVALID){ VERYHIGH_MSG("No valid version"); return false; } if (ntohl(((int *)data)[1]) + 8 > dataLen){ VERYHIGH_MSG("Length mismatch"); return false; } return true; } /// Returns the recognized packet type. /// This type is set by reInit and all constructors, and then just referenced from there on. packType Packet::getVersion() const{return version;} /// Resets this packet back to the same state as if it had just been freshly constructed. /// If needed, this frees the data pointer. void Packet::null(){ if (master && data){free(data);} master = false; data = NULL; bufferLen = 0; dataLen = 0; version = DTSC_INVALID; } /// Internally used resize function for when operating in copy mode and the internal buffer is too /// small. It will only resize up, never down. ///\param len The length th scale the buffer up to if necessary void Packet::resize(size_t len){ if (master && len > bufferLen){ char *tmp = (char *)realloc(data, len); if (tmp){ data = tmp; bufferLen = len; }else{ FAIL_MSG("Out of memory on parsing a packet"); } } } void Packet::reInit(Socket::Connection &src){ int sleepCount = 0; null(); Util::ResizeablePointer ptr; while (src.connected()){ if (!ptr.rsize() && src.Received().available(8)){ if (src.Received().copy(2) != "DT"){ WARN_MSG("Invalid DTSC Packet header encountered (%s)", Encodings::Hex::encode(src.Received().copy(4)).c_str()); break; } ptr.allocate(Bit::btohl(src.Received().copy(8).data() + 4) + 8); } unsigned int readable = src.Received().bytes(ptr.rsize() - ptr.size()); if (ptr.rsize() && readable){ src.Received().remove(ptr, readable); if (ptr.size() == ptr.rsize()){ reInit(ptr, ptr.size()); return; } } if (!src.spool()){ if (sleepCount++ > 750){ WARN_MSG("Waiting for packet on connection timed out"); return; } Util::sleep(20); } } } ///\brief Initializes a packet with new data ///\param data_ The new data for the packet ///\param len The length of the data pointed to by data_ ///\param noCopy Determines whether to make a copy or not void Packet::reInit(const char *data_, unsigned int len, bool noCopy){ if (!data_){ WARN_MSG("ReInit received a null pointer with len %d, nulling", len); null(); return; } if (!data_[0] && !data_[1] && !data_[2] && !data_[3]){ null(); return; } if (data_[0] != 'D' || data_[1] != 'T'){ unsigned int twlen = len; if (twlen > 20){twlen = 20;} HIGH_MSG("ReInit received a pointer that didn't start with 'DT' but with %s (%u) - data " "corruption?", JSON::Value(std::string(data_, twlen)).toString().c_str(), len); null(); return; } if (len <= 0){len = ntohl(((int *)data_)[1]) + 8;} // clear any existing controlled contents if (master && noCopy){null();} // set control flag to !noCopy master = !noCopy; // either copy the data, or only the pointer, depending on flag if (noCopy){ data = (char *)data_; }else{ resize(len); memcpy(data, data_, len); } // check header type and store packet length dataLen = len; version = DTSC_INVALID; if (len < 4){ FAIL_MSG("ReInit received a packet with size < 4"); return; } if (!memcmp(data, Magic_Packet2, 4)){version = DTSC_V2;} if (!memcmp(data, Magic_Packet, 4)){version = DTSC_V1;} if (!memcmp(data, Magic_Header, 4)){version = DTSC_HEAD;} if (!memcmp(data, Magic_Command, 4)){version = DTCM;} if (version == DTSC_INVALID){FAIL_MSG("ReInit received a packet with invalid header");} } /// Re-initializes this Packet to contain a generic DTSC packet with the given data fields. /// When given a NULL pointer, the data is reserved and memset to 0 /// If given a NULL pointer and a zero size, an empty packet is created. void Packet::genericFill(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData, size_t packDataSize, uint64_t packBytePos, bool isKeyframe){ null(); master = true; // time and trackID are part of the 20-byte header. // the container object adds 4 bytes (plus 2+namelen for each content, see below) // offset, if non-zero, adds 9 bytes (integer type) and 8 bytes (2+namelen) // bpos, if >= 0, adds 9 bytes (integer type) and 6 bytes (2+namelen) // keyframe, if true, adds 9 bytes (integer type) and 10 bytes (2+namelen) // data adds packDataSize+5 bytes (string type) and 6 bytes (2+namelen) if (packData && packDataSize < 1){ FAIL_MSG("Attempted to fill a packet with %zu bytes for timestamp %" PRIu64 ", track %" PRIu32 "!", packDataSize, packTime, packTrack); return; } unsigned int sendLen = 24 + (packOffset ? 17 : 0) + (packBytePos ? 15 : 0) + (isKeyframe ? 19 : 0) + packDataSize + 11; resize(sendLen); // set internal variables version = DTSC_V2; dataLen = sendLen; // write the first 20 bytes memcpy(data, "DTP2", 4); Bit::htobl(data + 4, sendLen - 8); Bit::htobl(data + 8, packTrack); Bit::htobll(data + 12, packTime); data[20] = 0xE0; // start container object unsigned int offset = 21; if (packOffset){ memcpy(data + offset, "\000\006offset\001", 9); Bit::htobll(data + offset + 9, packOffset); offset += 17; } if (packBytePos){ memcpy(data + offset, "\000\004bpos\001", 7); Bit::htobll(data + offset + 7, packBytePos); offset += 15; } if (isKeyframe){ memcpy(data + offset, "\000\010keyframe\001\000\000\000\000\000\000\000\001", 19); offset += 19; } memcpy(data + offset, "\000\004data\002", 7); Bit::htobl(data + offset + 7, packDataSize); memcpy(data + offset + 11, packData ? packData : 0, packDataSize); // finish container with 0x0000EE memcpy(data + offset + 11 + packDataSize, "\000\000\356", 3); } /// sets the keyframe byte. void Packet::setKeyFrame(bool kf){ uint32_t offset = 23; while (data[offset] != 'd' && data[offset] != 'k' && data[offset] != 'K'){ switch (data[offset]){ case 'o': offset += 17; break; case 'b': offset += 15; break; default: FAIL_MSG("Unknown field: %c", data[offset]); } } if (data[offset] == 'k' || data[offset] == 'K'){ data[offset] = (kf ? 'k' : 'K'); data[offset + 16] = (kf ? 1 : 0); }else{ ERROR_MSG("Could not set keyframe - field not found!"); } } void Packet::appendData(const char *appendData, uint32_t appendLen){ resize(dataLen + appendLen); memcpy(data + dataLen - 3, appendData, appendLen); memcpy(data + dataLen - 3 + appendLen, "\000\000\356", 3); // end container dataLen += appendLen; Bit::htobl(data + 4, Bit::btohl(data + 4) + appendLen); uint32_t offset = getDataStringLenOffset(); Bit::htobl(data + offset, Bit::btohl(data + offset) + appendLen); } void Packet::appendNal(const char *appendData, uint32_t appendLen){ if (appendLen == 0){return;} resize(dataLen + appendLen + 4); Bit::htobl(data + dataLen - 3, appendLen); memcpy(data + dataLen - 3 + 4, appendData, appendLen); memcpy(data + dataLen - 3 + 4 + appendLen, "\000\000\356", 3); // end container dataLen += appendLen + 4; Bit::htobl(data + 4, Bit::btohl(data + 4) + appendLen + 4); uint32_t offset = getDataStringLenOffset(); Bit::htobl(data + offset, Bit::btohl(data + offset) + appendLen + 4); prevNalSize = appendLen; } void Packet::upgradeNal(const char *appendData, uint32_t appendLen){ if (appendLen == 0){return;} uint64_t sizeOffset = dataLen - 3 - 4 - prevNalSize; if (Bit::btohl(data + sizeOffset) != prevNalSize){ FAIL_MSG("PrevNalSize state not correct"); return; } resize(dataLen + appendLen); // Not + 4 as size bytes have already been written here. Bit::htobl(data + sizeOffset, prevNalSize + appendLen); prevNalSize += appendLen; memcpy(data + dataLen - 3, appendData, appendLen); memcpy(data + dataLen - 3 + appendLen, "\000\000\356", 3); // end container dataLen += appendLen; Bit::htobl(data + 4, Bit::btohl(data + 4) + appendLen); uint32_t offset = getDataStringLenOffset(); Bit::htobl(data + offset, Bit::btohl(data + offset) + appendLen); } uint32_t Packet::getDataStringLen(){return Bit::btohl(data + getDataStringLenOffset());} /// Method can only be used when using internal functions to build the data. size_t Packet::getDataStringLenOffset(){ size_t offset = 23; while (data[offset] != 'd'){ switch (data[offset]){ case 'o': offset += 17; break; case 'b': offset += 15; break; case 'k': offset += 19; break; case 'K': offset += 19; break; default: FAIL_MSG("Unknown field: %c", data[offset]); return -1; } } return offset + 5; } /// Helper function for skipping over whole DTSC parts static char *skipDTSC(char *p, char *max){ if (p + 1 >= max || p[0] == 0x00){ return 0; // out of packet! 1 == error } if (p[0] == DTSC_INT){ // int, skip 9 bytes to next value return p + 9; } if (p[0] == DTSC_STR){ if (p + 4 >= max){ return 0; // out of packet! } return p + 5 + Bit::btohl(p + 1); } if (p[0] == DTSC_OBJ || p[0] == DTSC_CON){ p++; // object, scan contents while (p < max && p[0] + p[1] != 0){// while not encountering 0x0000 (we assume 0x0000EE) if (p + 2 >= max){ return 0; // out of packet! } p += 2 + Bit::btohs(p); // skip size // otherwise, search through the contents, if needed, and continue p = skipDTSC(p, max); if (!p){return 0;} } return p + 3; } if (p[0] == DTSC_ARR){ p++; // array, scan contents while (p < max && p[0] + p[1] != 0){// while not encountering 0x0000 (we assume 0x0000EE) // search through contents... p = skipDTSC(p, max); if (!p){return 0;} } return p + 3; // skip end marker } return 0; // out of packet! 1 == error } ///\brief Retrieves a single parameter as a string ///\param identifier The name of the parameter ///\param result A location on which the string will be returned ///\param len An integer in which the length of the string will be returned void Packet::getString(const char *identifier, char *&result, size_t &len) const{ getScan().getMember(identifier).getString(result, len); } ///\brief Retrieves a single parameter as a string ///\param identifier The name of the parameter ///\param result The string in which to store the result void Packet::getString(const char *identifier, std::string &result) const{ result = getScan().getMember(identifier).asString(); } ///\brief Retrieves a single parameter as an integer ///\param identifier The name of the parameter ///\param result The result is stored in this integer void Packet::getInt(const char *identifier, uint64_t &result) const{ result = getScan().getMember(identifier).asInt(); } ///\brief Retrieves a single parameter as an integer ///\param identifier The name of the parameter ///\result The requested parameter as an integer uint64_t Packet::getInt(const char *identifier) const{ uint64_t result; getInt(identifier, result); return result; } ///\brief Retrieves a single parameter as a boolean ///\param identifier The name of the parameter ///\param result The result is stored in this boolean void Packet::getFlag(const char *identifier, bool &result) const{ uint64_t result_; getInt(identifier, result_); result = result_; } ///\brief Retrieves a single parameter as a boolean ///\param identifier The name of the parameter ///\result The requested parameter as a boolean bool Packet::getFlag(const char *identifier) const{ bool result; getFlag(identifier, result); return result; } ///\brief Checks whether a parameter exists ///\param identifier The name of the parameter ///\result Whether the parameter exists or not bool Packet::hasMember(const char *identifier) const{ return getScan().getMember(identifier).getType() > 0; } ///\brief Returns the timestamp of the packet. ///\return The timestamp of this packet. uint64_t Packet::getTime() const{ if (version != DTSC_V2){ if (!data){return 0;} return getInt("time"); } return Bit::btohll(data + 12); } void Packet::setTime(uint64_t _time){ if (!master){ INFO_MSG("Can't set the time for this packet, as it is not master."); return; } Bit::htobll(data + 12, _time); } void Packet::nullMember(const std::string & memb){ if (!master){ INFO_MSG("Can't null '%s' for this packet, as it is not master.", memb.c_str()); return; } getScan().nullMember(memb); } ///\brief Returns the track id of the packet. ///\return The track id of this packet. size_t Packet::getTrackId() const{ if (version != DTSC_V2){return getInt("trackid");} return Bit::btohl(data + 8); } ///\brief Returns a pointer to the payload of this packet. ///\return A pointer to the payload of this packet. char *Packet::getData() const{return data;} ///\brief Returns the size of this packet. ///\return The size of this packet. uint32_t Packet::getDataLen() const{return dataLen;} ///\brief Returns the size of the payload of this packet. ///\return The size of the payload of this packet. uint32_t Packet::getPayloadLen() const{ if (version == DTSC_V2){ return dataLen - 20; }else{ return dataLen - 8; } } /// Returns a DTSC::Scan instance to the contents of this packet. /// May return an invalid instance if this packet is invalid. Scan Packet::getScan() const{ if (!*this || !getDataLen() || !getPayloadLen() || getDataLen() <= getPayloadLen()){ return Scan(); } return Scan(data + (getDataLen() - getPayloadLen()), getPayloadLen()); } ///\brief Converts the packet into a JSON value ///\return A JSON::Value representation of this packet. JSON::Value Packet::toJSON() const{ JSON::Value result; uint32_t i = 8; if (getVersion() == DTSC_V1){JSON::fromDTMI(data, dataLen, i, result);} if (getVersion() == DTSC_V2){JSON::fromDTMI2(data, dataLen, i, result);} return result; } std::string Packet::toSummary() const{ std::stringstream out; char *res = 0; size_t len = 0; getString("data", res, len); out << getTrackId() << "@" << getTime() << ": " << len << " bytes"; if (hasMember("keyframe")){out << " (keyframe)";} return out.str(); } /// Create an invalid DTSC::Scan object by default. Scan::Scan(){ p = 0; len = 0; } /// Create a DTSC::Scan object from memory pointer. Scan::Scan(char *pointer, size_t length){ p = pointer; len = length; } /// Returns whether the DTSC::Scan object contains valid data. Scan::operator bool() const{return (p && len);} /// Returns an object representing the named indice of this object. /// Returns an invalid object if this indice doesn't exist or this isn't an object type. Scan Scan::getMember(const std::string &indice) const{ return getMember(indice.data(), indice.size()); } /// Returns an object representing the named indice of this object. /// Returns an invalid object if this indice doesn't exist or this isn't an object type. Scan Scan::getMember(const char *indice, const size_t ind_len) const{ if (getType() != DTSC_OBJ && getType() != DTSC_CON){return Scan();} char *i = p + 1; // object, scan contents while (i[0] + i[1] != 0 && i < p + len){// while not encountering 0x0000 (we assume 0x0000EE) if (i + 2 >= p + len){ return Scan(); // out of packet! } uint16_t strlen = Bit::btohs(i); i += 2; if (ind_len == strlen && strncmp(indice, i, strlen) == 0){ return Scan(i + strlen, len - (i - p)); } i = skipDTSC(i + strlen, p + len); if (!i){return Scan();} } return Scan(); } /// If this is an object type and contains the given indice/len, sets the indice name to all zeroes. void Scan::nullMember(const std::string & indice){ nullMember(indice.data(), indice.size()); } /// If this is an object type and contains the given indice/len, sets the indice name to all zeroes. void Scan::nullMember(const char * indice, const size_t ind_len){ if (getType() != DTSC_OBJ && getType() != DTSC_CON){return;} char * i = p + 1; //object, scan contents while (i[0] + i[1] != 0 && i < p + len){//while not encountering 0x0000 (we assume 0x0000EE) if (i + 2 >= p + len){ return;//out of packet! } uint16_t strlen = Bit::btohs(i); i += 2; if (ind_len == strlen && strncmp(indice, i, strlen) == 0){ memset(i, 0, strlen); return; } i = skipDTSC(i + strlen, p + len); if (!i){return;} } return; } /// Returns an object representing the named indice of this object. /// Returns an invalid object if this indice doesn't exist or this isn't an object type. bool Scan::hasMember(const std::string &indice) const{ return getMember(indice.data(), indice.size()); } /// Returns whether an object representing the named indice of this object exists. /// Returns false if this indice doesn't exist or this isn't an object type. bool Scan::hasMember(const char *indice, const size_t ind_len) const{ return getMember(indice, ind_len); } /// Returns an object representing the named indice of this object. /// Returns an invalid object if this indice doesn't exist or this isn't an object type. Scan Scan::getMember(const char *indice) const{return getMember(indice, strlen(indice));} /// Returns the amount of indices if an array, the amount of members if an object, or zero /// otherwise. size_t Scan::getSize() const{ uint32_t arr_indice = 0; if (getType() == DTSC_ARR){ char *i = p + 1; // array, scan contents while (i[0] + i[1] != 0 && i < p + len){// while not encountering 0x0000 (we assume // 0x0000EE) // search through contents... arr_indice++; i = skipDTSC(i, p + len); if (!i){break;} } } if (getType() == DTSC_OBJ || getType() == DTSC_CON){ char *i = p + 1; // object, scan contents while (i[0] + i[1] != 0 && i < p + len){// while not encountering 0x0000 (we assume // 0x0000EE) if (i + 2 >= p + len){ return Scan(); // out of packet! } uint16_t strlen = Bit::btohs(i); i += 2; arr_indice++; i = skipDTSC(i + strlen, p + len); if (!i){break;} } } return arr_indice; } /// Returns an object representing the num-th indice of this array. /// If not an array but an object, it returns the num-th member, instead. /// Returns an invalid object if this indice doesn't exist or this isn't an array or object type. Scan Scan::getIndice(size_t num) const{ if (getType() == DTSC_ARR){ char *i = p + 1; unsigned int arr_indice = 0; // array, scan contents while (i[0] + i[1] != 0 && i < p + len){// while not encountering 0x0000 (we assume // 0x0000EE) // search through contents... if (arr_indice == num){ return Scan(i, len - (i - p)); }else{ arr_indice++; i = skipDTSC(i, p + len); if (!i){return Scan();} } } } if (getType() == DTSC_OBJ || getType() == DTSC_CON){ char *i = p + 1; unsigned int arr_indice = 0; // object, scan contents while (i[0] + i[1] != 0 && i < p + len){// while not encountering 0x0000 (we assume // 0x0000EE) if (i + 2 >= p + len){ return Scan(); // out of packet! } unsigned int strlen = Bit::btohs(i); i += 2; if (arr_indice == num){ return Scan(i + strlen, len - (i - p)); }else{ arr_indice++; i = skipDTSC(i + strlen, p + len); if (!i){return Scan();} } } } return Scan(); } /// Returns the name of the num-th member of this object. /// Returns an empty string on error or when not an object. std::string Scan::getIndiceName(size_t num) const{ if (getType() == DTSC_OBJ || getType() == DTSC_CON){ char *i = p + 1; unsigned int arr_indice = 0; // object, scan contents while (i[0] + i[1] != 0 && i < p + len){// while not encountering 0x0000 (we assume // 0x0000EE) if (i + 2 >= p + len){ return ""; // out of packet! } unsigned int strlen = Bit::btohs(i); i += 2; if (arr_indice == num){ return std::string(i, strlen); }else{ arr_indice++; i = skipDTSC(i + strlen, p + len); if (!i){return "";} } } } return ""; } /// Returns the first byte of this DTSC value, or 0 on error. char Scan::getType() const{ if (!p){return 0;} return p[0]; } /// Returns the boolean value of this DTSC value. /// Numbers are compared to 0. /// Strings are checked for non-zero length. /// Objects and arrays are checked for content. /// Returns false on error or in other cases. bool Scan::asBool() const{ switch (getType()){ case DTSC_STR: return (p[1] | p[2] | p[3] | p[4]); case DTSC_INT: return (asInt() != 0); case DTSC_OBJ: case DTSC_CON: case DTSC_ARR: return (p[1] | p[2]); default: return false; } } /// Returns the long long value of this DTSC number value. /// Will convert string values to numbers, taking octal and hexadecimal types into account. /// Illegal or invalid values return 0. int64_t Scan::asInt() const{ switch (getType()){ case DTSC_INT: return Bit::btohll(p + 1); case DTSC_STR: char *str; size_t strlen; getString(str, strlen); if (!strlen){return 0;} return strtoll(str, 0, 0); default: return 0; } } /// Returns the string value of this DTSC string value. /// Uses getString internally, if a string. /// Converts integer values to strings. /// Returns an empty string on error. std::string Scan::asString() const{ switch (getType()){ case DTSC_INT:{ std::stringstream st; st << asInt(); return st.str(); }break; case DTSC_STR:{ char *str; size_t strlen; getString(str, strlen); return std::string(str, strlen); }break; } return ""; } /// Sets result to a pointer to the string, and strlen to the length of it. /// Sets both to zero if this isn't a DTSC string value. /// Attempts absolutely no conversion. void Scan::getString(char *&result, size_t &strlen) const{ if (getType() == DTSC_STR){ result = p + 5; strlen = Bit::btohl(p + 1); return; } result = 0; strlen = 0; } /// Returns the DTSC scan object as a JSON value /// Returns an empty object on error. JSON::Value Scan::asJSON() const{ JSON::Value result; unsigned int i = 0; JSON::fromDTMI(p, len, i, result); return result; } /// \todo Move this function to some generic area. Duplicate from json.cpp static inline char hex2c(char c){ if (c < 10){return '0' + c;} if (c < 16){return 'A' + (c - 10);} return '0'; } /// \todo Move this function to some generic area. Duplicate from json.cpp static std::string string_escape(const std::string val){ std::stringstream out; out << "\""; for (size_t i = 0; i < val.size(); ++i){ switch (val.data()[i]){ case '"': out << "\\\""; break; case '\\': out << "\\\\"; break; case '\n': out << "\\n"; break; case '\b': out << "\\b"; break; case '\f': out << "\\f"; break; case '\r': out << "\\r"; break; case '\t': out << "\\t"; break; default: if (val.data()[i] < 32 || val.data()[i] > 126){ out << "\\u00"; out << hex2c((val.data()[i] >> 4) & 0xf); out << hex2c(val.data()[i] & 0xf); }else{ out << val.data()[i]; } break; } } out << "\""; return out.str(); } std::string Scan::toPrettyString(size_t indent) const{ switch (getType()){ case DTSC_STR:{ uint32_t strlen = Bit::btohl(p + 1); if (strlen > 250){ std::stringstream ret; ret << "\"" << strlen << " bytes of data\""; return ret.str(); } return string_escape(asString()); } case DTSC_INT:{ std::stringstream ret; ret << asInt(); return ret.str(); } case DTSC_OBJ: case DTSC_CON:{ std::stringstream ret; ret << "{" << std::endl; indent += 2; char *i = p + 1; bool first = true; // object, scan contents while (i[0] + i[1] != 0 && i < p + len){// while not encountering 0x0000 (we assume // 0x0000EE) if (i + 2 >= p + len){ indent -= 2; ret << std::string(indent, ' ') << "}//walked out of object here"; return ret.str(); } if (!first){ret << "," << std::endl;} first = false; uint16_t strlen = Bit::btohs(i); i += 2; ret << std::string(indent, ' ') << "\"" << std::string(i, strlen) << "\": " << Scan(i + strlen, len - (i - p)).toPrettyString(indent); i = skipDTSC(i + strlen, p + len); if (!i){ indent -= 2; ret << std::string(indent, ' ') << "}//could not locate next object"; return ret.str(); } } indent -= 2; ret << std::endl << std::string(indent, ' ') << "}"; return ret.str(); } case DTSC_ARR:{ std::stringstream ret; ret << "[" << std::endl; indent += 2; Scan tmpScan; unsigned int i = 0; bool first = true; do{ tmpScan = getIndice(i++); if (tmpScan.getType()){ if (!first){ret << "," << std::endl;} first = false; ret << std::string(indent, ' ') << tmpScan.toPrettyString(indent); } }while (tmpScan.getType()); indent -= 2; ret << std::endl << std::string(indent, ' ') << "]"; return ret.str(); } default: return "Error"; } } /// Initialize metadata from referenced DTSC::Scan object in master mode. Meta::Meta(const std::string &_streamName, const DTSC::Scan &src){ version = DTSH_VERSION; streamMemBuf = 0; isMemBuf = false; isMaster = true; reInit(_streamName, src); } /// Initialize empty metadata, in master or slave mode. /// If stream name is empty, slave mode is enforced. Meta::Meta(const std::string &_streamName, bool master){ if (!_streamName.size()){master = false;} version = DTSH_VERSION; streamMemBuf = 0; isMemBuf = false; isMaster = master; reInit(_streamName, master); } /// Initialize metadata from given DTSH file in master mode. Meta::Meta(const std::string &_streamName, const std::string &fileName){ version = DTSH_VERSION; streamMemBuf = 0; isMemBuf = false; isMaster = true; reInit(_streamName, fileName); } void Meta::setMaster(bool _master){isMaster = _master;} bool Meta::getMaster() const{return isMaster;} /// Calls clear(), then initializes freshly. /// If stream name is set, uses shared memory backing. /// If stream name is empty, uses non-shared memory backing. void Meta::reInit(const std::string &_streamName, bool master){ clear(); if (_streamName == ""){ sBufMem(); }else{ sBufShm(_streamName, DEFAULT_TRACK_COUNT, master); } streamInit(); } /// Calls clear(), then initializes from given DTSH file in master mode. /// Internally calls reInit(const std::string&, const DTSC::Scan&). /// If stream name is set, uses shared memory backing. /// If stream name is empty, uses non-shared memory backing. void Meta::reInit(const std::string &_streamName, const std::string &fileName){ clear(); ///\todo Implement absence of keysizes here instead of input::parseHeader std::ifstream inFile(fileName.c_str()); if (!inFile.is_open()){return;} inFile.seekg(0, std::ios_base::end); size_t fileSize = inFile.tellg(); inFile.seekg(0, std::ios_base::beg); char *scanBuf = (char *)malloc(fileSize); inFile.read(scanBuf, fileSize); inFile.close(); DTSC::Packet pkt(scanBuf, fileSize, true); reInit(_streamName, pkt.getScan()); free(scanBuf); } /// Calls clear(), then initializes from the given DTSC:Scan object in master mode. /// If stream name is set, uses shared memory backing. /// If stream name is empty, uses non-shared memory backing. void Meta::reInit(const std::string &_streamName, const DTSC::Scan &src){ clear(); if (_streamName == ""){ sBufMem(); }else{ sBufShm(_streamName, DEFAULT_TRACK_COUNT, true); } streamInit(); setVod(src.hasMember("vod") && src.getMember("vod").asInt()); setLive(src.hasMember("live") && src.getMember("live").asInt()); version = src.getMember("version").asInt(); if (src.hasMember("inputLocalVars")){ inputLocalVars = JSON::fromString(src.getMember("inputLocalVars").asString()); } size_t tNum = src.getMember("tracks").getSize(); for (int i = 0; i < tNum; i++){ addTrackFrom(src.getMember("tracks").getIndice(i)); } // Unix Time at zero point of a stream if (src.hasMember("unixzero")){ setBootMsOffset(src.getMember("unixzero").asInt() - Util::unixMS() + Util::bootMS()); }else{ MEDIUM_MSG("No member \'unixzero\' found in DTSC::Scan. Calculating locally."); int64_t lastMs = 0; for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++){ if (it->second.track.getInt(it->second.trackLastmsField) > lastMs){ lastMs = it->second.track.getInt(it->second.trackLastmsField); } } setBootMsOffset(Util::bootMS() - lastMs); } } void Meta::addTrackFrom(const DTSC::Scan &trak){ char *fragStor = 0; char *keyStor = 0; char *partStor = 0; char *keySizeStor = 0; size_t fragLen = 0; size_t keyLen = 0; size_t partLen = 0; size_t keySizeLen = 0; uint32_t fragCount = DEFAULT_FRAGMENT_COUNT; uint32_t keyCount = DEFAULT_KEY_COUNT; uint32_t partCount = DEFAULT_PART_COUNT; if (trak.hasMember("fragments") && trak.hasMember("keys") && trak.hasMember("parts") && trak.hasMember("keysizes")){ trak.getMember("fragments").getString(fragStor, fragLen); trak.getMember("keys").getString(keyStor, keyLen); trak.getMember("parts").getString(partStor, partLen); trak.getMember("keysizes").getString(keySizeStor, keySizeLen); fragCount = fragLen / DTSH_FRAGMENT_SIZE; keyCount = keyLen / DTSH_KEY_SIZE; partCount = partLen / DTSH_PART_SIZE; } size_t tIdx = addTrack(fragCount, keyCount, partCount); setType(tIdx, trak.getMember("type").asString()); setCodec(tIdx, trak.getMember("codec").asString()); setInit(tIdx, trak.getMember("init").asString()); setLang(tIdx, trak.getMember("lang").asString()); setID(tIdx, trak.getMember("trackid").asInt()); setFirstms(tIdx, trak.getMember("firstms").asInt()); setLastms(tIdx, trak.getMember("lastms").asInt()); setBps(tIdx, trak.getMember("bps").asInt()); setMaxBps(tIdx, trak.getMember("maxbps").asInt()); setSourceTrack(tIdx, INVALID_TRACK_ID); if (trak.getMember("type").asString() == "video"){ setWidth(tIdx, trak.getMember("width").asInt()); setHeight(tIdx, trak.getMember("height").asInt()); setFpks(tIdx, trak.getMember("fpks").asInt()); }else if (trak.getMember("type").asString() == "audio"){ // rate channels size setRate(tIdx, trak.getMember("rate").asInt()); setChannels(tIdx, trak.getMember("channels").asInt()); setSize(tIdx, trak.getMember("size").asInt()); } //Do not parse any of the more complex data, if any of it is missing. if (!fragLen || !keyLen || !partLen || !keySizeLen){return;} //Ok, we have data, let's parse it, too. Track &s = tracks[tIdx]; uint64_t *vals = (uint64_t *)malloc(4 * fragCount * sizeof(uint64_t)); for (int i = 0; i < fragCount; i++){ char *ptr = fragStor + (i * DTSH_FRAGMENT_SIZE); vals[i] = Bit::btohl(ptr); vals[fragCount + i] = ptr[4]; vals[(2 * fragCount) + i] = Bit::btohl(ptr + 5) - 1; vals[(3 * fragCount) + i] = Bit::btohl(ptr + 9); } s.fragments.setInts("duration", vals, fragCount); s.fragments.setInts("keys", vals + fragCount, fragCount); s.fragments.setInts("firstkey", vals + (2 * fragCount), fragCount); s.fragments.setInts("size", vals + (3 * fragCount), fragCount); s.fragments.addRecords(fragCount); vals = (uint64_t *)realloc(vals, 7 * keyCount * sizeof(uint64_t)); uint64_t totalPartCount = 0; for (int i = 0; i < keyCount; i++){ char *ptr = keyStor + (i * DTSH_KEY_SIZE); vals[i] = Bit::btohll(ptr); vals[keyCount + i] = Bit::btoh24(ptr + 8); vals[(2 * keyCount) + i] = Bit::btohl(ptr + 11); vals[(3 * keyCount) + i] = Bit::btohs(ptr + 15); vals[(4 * keyCount) + i] = Bit::btohll(ptr + 17); vals[(5 * keyCount) + i] = Bit::btohl(keySizeStor + (i * 4)); // NOT WITH ptr!! vals[(6 * keyCount) + i] = totalPartCount; totalPartCount += vals[(3 * keyCount) + i]; } s.keys.setInts("bpos", vals, keyCount); s.keys.setInts("duration", vals + keyCount, keyCount); s.keys.setInts("number", vals + (2 * keyCount), keyCount); s.keys.setInts("parts", vals + (3 * keyCount), keyCount); s.keys.setInts("time", vals + (4 * keyCount), keyCount); s.keys.setInts("size", vals + (5 * keyCount), keyCount); s.keys.setInts("firstpart", vals + (6 * keyCount), keyCount); s.keys.addRecords(keyCount); vals = (uint64_t *)realloc(vals, 3 * partCount * sizeof(uint64_t)); for (int i = 0; i < partCount; i++){ char *ptr = partStor + (i * DTSH_PART_SIZE); vals[i] = Bit::btoh24(ptr); vals[partCount + i] = Bit::btoh24(ptr + 3); vals[(2 * partCount) + i] = Bit::btoh24(ptr + 6); } s.parts.setInts("size", vals, partCount); s.parts.setInts("duration", vals + partCount, partCount); s.parts.setInts("offset", vals + (2 * partCount), partCount); s.parts.addRecords(partCount); free(vals); } /// Simply calls clear() Meta::~Meta(){clear();} /// Switches the object to non-shared memory backed mode, with enough room for the given track /// count. Should not be called repeatedly, nor to switch modes. void Meta::sBufMem(size_t trackCount){ size_t bufferSize = META_META_OFFSET + META_TRACK_OFFSET + META_META_RECORDSIZE + (trackCount * META_TRACK_RECORDSIZE); isMemBuf = true; streamMemBuf = (char *)malloc(bufferSize); memset(streamMemBuf, 0, bufferSize); stream = Util::RelAccX(streamMemBuf, false); } /// Initializes shared memory backed mode, with enough room for the given track count. /// Should not be called repeatedly, nor to switch modes. void Meta::sBufShm(const std::string &_streamName, size_t trackCount, bool master){ isMaster = master; if (isMaster){HIGH_MSG("Creating meta page for stream %s", _streamName.c_str());} size_t bufferSize = META_META_OFFSET + META_TRACK_OFFSET + META_META_RECORDSIZE + (trackCount * META_TRACK_RECORDSIZE); isMemBuf = false; streamName = _streamName; char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, streamName.c_str()); if (master){ streamPage.init(pageName, bufferSize, false, false); if (streamPage.mapped){ FAIL_MSG("Re-claiming page %s", pageName); BACKTRACE; }else{ streamPage.init(pageName, bufferSize, true); } streamPage.master = false; stream = Util::RelAccX(streamPage.mapped, false); }else{ streamPage.init(pageName, bufferSize, false, true); if (!streamPage.mapped){ INFO_MSG("Page %s not found", pageName); return; } stream = Util::RelAccX(streamPage.mapped, true); } } /// In master mode, creates and stores the fields for the "stream" child object. /// In slave mode, simply class refresh(). /// Regardless, afterwards the internal RelAccXFieldData members are updated with their correct /// values. void Meta::streamInit(size_t trackCount){ if (isMaster){ ///\todo Add safety for non-initialized stream object; stream.addField("vod", RAX_UINT); stream.addField("live", RAX_UINT); stream.addField("tracks", RAX_NESTED, META_TRACK_OFFSET + (trackCount * META_TRACK_RECORDSIZE)); stream.addField("source", RAX_STRING, 512); stream.addField("maxkeepaway", RAX_16UINT); stream.addField("bufferwindow", RAX_64UINT); stream.addField("bootmsoffset", RAX_64INT); stream.addField("utcoffset", RAX_64INT); stream.addField("minfragduration", RAX_64UINT); stream.setRCount(1); stream.setReady(); stream.addRecords(1); trackList = Util::RelAccX(stream.getPointer("tracks"), false); trackList.addField("valid", RAX_UINT); trackList.addField("id", RAX_32UINT); trackList.addField("type", RAX_32STRING); trackList.addField("codec", RAX_32STRING); trackList.addField("page", RAX_256STRING); trackList.addField("lastupdate", RAX_64UINT); trackList.addField("pid", RAX_32UINT); trackList.addField("minkeepaway", RAX_64UINT); trackList.addField("sourcetid", RAX_32UINT); trackList.addField("encryption", RAX_256STRING); trackList.addField("ivec", RAX_64UINT); trackList.addField("widevine", RAX_256STRING); trackList.addField("playready", RAX_STRING, 1024); trackList.setRCount(trackCount); trackList.setReady(); }else{ refresh(); } // Initialize internal bufferFields streamVodField = stream.getFieldData("vod"); streamLiveField = stream.getFieldData("live"); streamSourceField = stream.getFieldData("source"); streamMaxKeepAwayField = stream.getFieldData("maxkeepaway"); streamBufferWindowField = stream.getFieldData("bufferwindow"); streamBootMsOffsetField = stream.getFieldData("bootmsoffset"); streamUTCOffsetField = stream.getFieldData("utcoffset"); streamMinimumFragmentDurationField = stream.getFieldData("minfragduration"); trackValidField = trackList.getFieldData("valid"); trackIdField = trackList.getFieldData("id"); trackTypeField = trackList.getFieldData("type"); trackCodecField = trackList.getFieldData("codec"); trackPageField = trackList.getFieldData("page"); trackLastUpdateField = trackList.getFieldData("lastupdate"); trackPidField = trackList.getFieldData("pid"); trackMinKeepAwayField = trackList.getFieldData("minkeepaway"); trackSourceTidField = trackList.getFieldData("sourcetid"); trackEncryptionField = trackList.getFieldData("encryption"); trackIvecField = trackList.getFieldData("ivec"); trackWidevineField = trackList.getFieldData("widevine"); trackPlayreadyField = trackList.getFieldData("playready"); } /// Reads the "tracks" field from the "stream" child object, populating the "tracks" variable. /// Does not clear "tracks" beforehand, so it may contain stale information afterwards if it was /// already populated. void Meta::refresh(){ if (!stream.isReady() || !stream.getPointer("tracks")){ INFO_MSG("No track pointer, not refreshing."); return; } trackList = Util::RelAccX(stream.getPointer("tracks"), false); for (size_t i = 0; i < trackList.getPresent(); i++){ if (trackList.getInt("valid", i) == 0){continue;} if (tracks.count(i)){continue;} IPC::sharedPage &p = tM[i]; p.init(trackList.getPointer("page", i), SHM_STREAM_TRACK_LEN, false, false); Track &t = tracks[i]; t.track = Util::RelAccX(p.mapped, true); t.parts = Util::RelAccX(t.track.getPointer("parts"), true); t.keys = Util::RelAccX(t.track.getPointer("keys"), true); t.fragments = Util::RelAccX(t.track.getPointer("fragments"), true); t.pages = Util::RelAccX(t.track.getPointer("pages"), true); t.trackIdField = t.track.getFieldData("id"); t.trackTypeField = t.track.getFieldData("type"); t.trackCodecField = t.track.getFieldData("codec"); t.trackFirstmsField = t.track.getFieldData("firstms"); t.trackLastmsField = t.track.getFieldData("lastms"); t.trackBpsField = t.track.getFieldData("bps"); t.trackMaxbpsField = t.track.getFieldData("maxbps"); t.trackLangField = t.track.getFieldData("lang"); t.trackInitField = t.track.getFieldData("init"); t.trackRateField = t.track.getFieldData("rate"); t.trackSizeField = t.track.getFieldData("size"); t.trackChannelsField = t.track.getFieldData("channels"); t.trackWidthField = t.track.getFieldData("width"); t.trackHeightField = t.track.getFieldData("height"); t.trackFpksField = t.track.getFieldData("fpks"); t.trackMissedFragsField = t.track.getFieldData("missedFrags"); t.partSizeField = t.parts.getFieldData("size"); t.partDurationField = t.parts.getFieldData("duration"); t.partOffsetField = t.parts.getFieldData("offset"); t.keyFirstPartField = t.keys.getFieldData("firstpart"); t.keyBposField = t.keys.getFieldData("bpos"); t.keyDurationField = t.keys.getFieldData("duration"); t.keyNumberField = t.keys.getFieldData("number"); t.keyPartsField = t.keys.getFieldData("parts"); t.keyTimeField = t.keys.getFieldData("time"); t.keySizeField = t.keys.getFieldData("size"); t.fragmentDurationField = t.fragments.getFieldData("duration"); t.fragmentKeysField = t.fragments.getFieldData("keys"); t.fragmentFirstKeyField = t.fragments.getFieldData("firstkey"); t.fragmentSizeField = t.fragments.getFieldData("size"); } } /// Reloads shared memory pages that are marked as needing an update, if any /// Returns true if a reload happened bool Meta::reloadReplacedPagesIfNeeded(){ if (isMemBuf){return false;}//Only for shm-backed metadata if (!stream.isReady() || !stream.getPointer("tracks")){ INFO_MSG("No track pointer, not refreshing."); return false; } char pageName[NAME_BUFFER_SIZE]; if (stream.isReload() || stream.isExit()){ INFO_MSG("Reloading entire metadata"); streamPage.close(); snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, streamName.c_str()); streamPage.init(pageName, 0, false, true); if (!streamPage.mapped){ INFO_MSG("Page %s not found", pageName); return true; } stream = Util::RelAccX(streamPage.mapped, true); tM.clear(); tracks.clear(); refresh(); return true; } bool ret = false; for (size_t i = 0; i < trackList.getPresent(); i++){ if (trackList.getInt("valid", i) == 0){continue;} bool always_load = !tracks.count(i); if (always_load || tracks[i].track.isReload()){ ret = true; Track &t = tracks[i]; if (always_load){ VERYHIGH_MSG("Loading track: %s", trackList.getPointer("page", i)); }else{ VERYHIGH_MSG("Reloading track: %s", trackList.getPointer("page", i)); } IPC::sharedPage &p = tM[i]; p.init(trackList.getPointer("page", i), SHM_STREAM_TRACK_LEN, false, false); if (!p.mapped){ WARN_MSG("Failed to load page %s, retrying later", trackList.getPointer("page", i)); tM.erase(i); tracks.erase(i); continue; } t.track = Util::RelAccX(p.mapped, true); t.parts = Util::RelAccX(t.track.getPointer("parts"), true); t.keys = Util::RelAccX(t.track.getPointer("keys"), true); t.fragments = Util::RelAccX(t.track.getPointer("fragments"), true); t.pages = Util::RelAccX(t.track.getPointer("pages"), true); t.trackIdField = t.track.getFieldData("id"); t.trackTypeField = t.track.getFieldData("type"); t.trackCodecField = t.track.getFieldData("codec"); t.trackFirstmsField = t.track.getFieldData("firstms"); t.trackLastmsField = t.track.getFieldData("lastms"); t.trackBpsField = t.track.getFieldData("bps"); t.trackMaxbpsField = t.track.getFieldData("maxbps"); t.trackLangField = t.track.getFieldData("lang"); t.trackInitField = t.track.getFieldData("init"); t.trackRateField = t.track.getFieldData("rate"); t.trackSizeField = t.track.getFieldData("size"); t.trackChannelsField = t.track.getFieldData("channels"); t.trackWidthField = t.track.getFieldData("width"); t.trackHeightField = t.track.getFieldData("height"); t.trackFpksField = t.track.getFieldData("fpks"); t.trackMissedFragsField = t.track.getFieldData("missedFrags"); t.partSizeField = t.parts.getFieldData("size"); t.partDurationField = t.parts.getFieldData("duration"); t.partOffsetField = t.parts.getFieldData("offset"); t.keyFirstPartField = t.keys.getFieldData("firstpart"); t.keyBposField = t.keys.getFieldData("bpos"); t.keyDurationField = t.keys.getFieldData("duration"); t.keyNumberField = t.keys.getFieldData("number"); t.keyPartsField = t.keys.getFieldData("parts"); t.keyTimeField = t.keys.getFieldData("time"); t.keySizeField = t.keys.getFieldData("size"); t.fragmentDurationField = t.fragments.getFieldData("duration"); t.fragmentKeysField = t.fragments.getFieldData("keys"); t.fragmentFirstKeyField = t.fragments.getFieldData("firstkey"); t.fragmentSizeField = t.fragments.getFieldData("size"); } } return ret; } /// Merges in track information from a given DTSC::Meta object, optionally deleting missing tracks /// and optionally making hard copies of the original data. void Meta::merge(const DTSC::Meta &M, bool deleteTracks, bool copyData){ std::set editedTracks; std::set newTracks = M.getValidTracks(); // Detect new tracks for (std::set::iterator it = newTracks.begin(); it != newTracks.end(); it++){ if (trackIDToIndex(M.getID(*it), getpid()) == INVALID_TRACK_ID){editedTracks.insert(*it);} } for (std::set::iterator it = editedTracks.begin(); it != editedTracks.end(); it++){ size_t fragCount = DEFAULT_FRAGMENT_COUNT; size_t keyCount = DEFAULT_KEY_COUNT; size_t partCount = DEFAULT_PART_COUNT; size_t pageCount = DEFAULT_PAGE_COUNT; if (copyData){ fragCount = M.tracks.at(*it).fragments.getRCount(); keyCount = M.tracks.at(*it).keys.getRCount(); partCount = M.tracks.at(*it).parts.getRCount(); pageCount = M.tracks.at(*it).pages.getRCount(); } size_t newIdx = addTrack(fragCount, keyCount, partCount, pageCount); setInit(newIdx, M.getInit(*it)); setID(newIdx, M.getID(*it)); setChannels(newIdx, M.getChannels(*it)); setRate(newIdx, M.getRate(*it)); setWidth(newIdx, M.getWidth(*it)); setHeight(newIdx, M.getHeight(*it)); setSize(newIdx, M.getSize(*it)); setType(newIdx, M.getType(*it)); setCodec(newIdx, M.getCodec(*it)); setLang(newIdx, M.getLang(*it)); if (copyData){ setFirstms(newIdx, M.getFirstms(*it)); setLastms(newIdx, M.getLastms(*it)); }else{ setFirstms(newIdx, 0); setLastms(newIdx, 0); } setBps(newIdx, M.getBps(*it)); setMaxBps(newIdx, M.getMaxBps(*it)); setFpks(newIdx, M.getFpks(*it)); setMissedFragments(newIdx, M.getMissedFragments(*it)); setMinKeepAway(newIdx, M.getMinKeepAway(*it)); setSourceTrack(newIdx, M.getSourceTrack(*it)); setEncryption(newIdx, M.getEncryption(*it)); setPlayReady(newIdx, M.getPlayReady(*it)); setWidevine(newIdx, M.getWidevine(*it)); setIvec(newIdx, M.getIvec(*it)); if (copyData){tracks[newIdx].track.flowFrom(M.tracks.at(*it).track);} } if (deleteTracks){ editedTracks.clear(); std::set validTracks = getValidTracks(); for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ if (M.trackIDToIndex(getID(*it), getpid()) == INVALID_TRACK_ID){ editedTracks.insert(*it); } } for (std::set::iterator it = editedTracks.begin(); it != editedTracks.end(); it++){ removeTrack(*it); } } } /// Evaluates to true if this is a shared-memory-backed object, correctly mapped, with a non-exit /// state on the "stream" RelAccX page. Meta::operator bool() const{return !isMemBuf && streamPage.mapped && !stream.isExit();} /// Intended to be used for encryption. Not currently called anywhere. size_t Meta::addCopy(size_t sourceTrack){ if (isMemBuf){ WARN_MSG("Unsupported operation for in-memory streams"); return INVALID_TRACK_ID; } size_t tNumber = trackList.getPresent(); Track &t = tracks[tNumber]; char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_TM, streamName.c_str(), getpid(), tNumber); INFO_MSG("Allocating page %s", pageName); tM[tNumber].init(pageName, tM[sourceTrack].len, true); tM[tNumber].master = false; memcpy(tM[tNumber].mapped, tM[sourceTrack].mapped, tM[sourceTrack].len); t.track = Util::RelAccX(tM[tNumber].mapped, true); t.parts = Util::RelAccX(t.track.getPointer("parts"), true); t.keys = Util::RelAccX(t.track.getPointer("keys"), true); t.fragments = Util::RelAccX(t.track.getPointer("fragments"), true); t.pages = Util::RelAccX(t.track.getPointer("pages"), true); trackList.setString(trackPageField, pageName, tNumber); trackList.setInt(trackPidField, getpid(), tNumber); trackList.setInt(trackSourceTidField, sourceTrack, tNumber); trackList.addRecords(1); validateTrack(tNumber); return tNumber; } /// Resizes a given track to be able to hold the given amount of fragments, keys, parts and pages. /// Currently called exclusively from Meta::update(), to resize the internal structures. void Meta::resizeTrack(size_t source, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount, const char * reason){ IPC::semaphore resizeLock; if (!isMemBuf){ std::string pageName = "/"; pageName += trackList.getPointer(trackPageField, source); resizeLock.open(pageName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); resizeLock.wait(); } size_t pageSize = (isMemBuf ? sizeMemBuf[source] : tM[source].len); char *orig = (char *)malloc(pageSize); if (!orig){ FAIL_MSG("Failed to re-allocate memory for track %zu: %s", source, strerror(errno)); return; } memcpy(orig, (isMemBuf ? tMemBuf[source] : tM[source].mapped), pageSize); Track &t = tracks[source]; t.track.setReload(); size_t newPageSize = TRACK_TRACK_OFFSET + TRACK_TRACK_RECORDSIZE + (TRACK_FRAGMENT_OFFSET + (TRACK_FRAGMENT_RECORDSIZE * fragCount)) + (TRACK_KEY_OFFSET + (TRACK_KEY_RECORDSIZE * keyCount)) + (TRACK_PART_OFFSET + (TRACK_PART_RECORDSIZE * partCount)) + (TRACK_PAGE_OFFSET + (TRACK_PAGE_RECORDSIZE * pageCount)); if (isMemBuf){ free(tMemBuf[source]); tMemBuf.erase(source); tMemBuf[source] = (char *)malloc(newPageSize); if (!tMemBuf[source]){ FAIL_MSG("Failed to re-allocate memory for track %zu: %s", source, strerror(errno)); resizeLock.unlink(); return; } sizeMemBuf[source] = newPageSize; memset(tMemBuf[source], 0, newPageSize); t.track = Util::RelAccX(tMemBuf[source], false); }else{ tM[source].master = true; tM[source].init(trackList.getPointer(trackPageField, source), newPageSize, true); if (!tM[source].mapped){ FAIL_MSG("Failed to re-allocate shared memory for track %zu: %s", source, strerror(errno)); resizeLock.unlink(); return; } tM[source].master = false; t.track = Util::RelAccX(tM[source].mapped, false); } initializeTrack(t, fragCount, keyCount, partCount, pageCount); Util::RelAccX origAccess(orig); Util::RelAccX origFragments(origAccess.getPointer("fragments")); Util::RelAccX origKeys(origAccess.getPointer("keys")); Util::RelAccX origParts(origAccess.getPointer("parts")); Util::RelAccX origPages(origAccess.getPointer("pages")); MEDIUM_MSG("Track %zu resizing (reason: %s): frags %" PRIu32 "->%zu, keys %" PRIu32 "->%zu, parts %" PRIu32 "->%zu, pages %" PRIu32 "->%zu", source, reason, origFragments.getRCount(), fragCount, origKeys.getRCount(), keyCount, origParts.getRCount(), partCount, origPages.getRCount(), pageCount); t.track.setInt(t.trackIdField, origAccess.getInt("id")); t.track.setString(t.trackTypeField, origAccess.getPointer("type")); t.track.setString(t.trackCodecField, origAccess.getPointer("codec")); t.track.setInt(t.trackFirstmsField, origAccess.getInt("firstms")); t.track.setInt(t.trackLastmsField, origAccess.getInt("lastms")); t.track.setInt(t.trackBpsField, origAccess.getInt("bps")); t.track.setInt(t.trackMaxbpsField, origAccess.getInt("maxbps")); t.track.setString(t.trackLangField, origAccess.getPointer("lang")); memcpy(t.track.getPointer(t.trackInitField), origAccess.getPointer("init"), 1024 * 1024); t.track.setInt(t.trackRateField, origAccess.getInt("rate")); t.track.setInt(t.trackSizeField, origAccess.getInt("size")); t.track.setInt(t.trackChannelsField, origAccess.getInt("channels")); t.track.setInt(t.trackWidthField, origAccess.getInt("width")); t.track.setInt(t.trackHeightField, origAccess.getInt("height")); t.track.setInt(t.trackFpksField, origAccess.getInt("fpks")); t.track.setInt(t.trackMissedFragsField, origAccess.getInt("missedFrags")); t.parts.setEndPos(origParts.getEndPos()); t.parts.setStartPos(origParts.getStartPos()); t.parts.setDeleted(origParts.getDeleted()); t.parts.setPresent(origParts.getPresent()); Util::FieldAccX origPartSizeAccX = origParts.getFieldAccX("size"); Util::FieldAccX origPartDurationAccX = origParts.getFieldAccX("duration"); Util::FieldAccX origPartOffsetAccX = origParts.getFieldAccX("offset"); Util::FieldAccX partSizeAccX = t.parts.getFieldAccX("size"); Util::FieldAccX partDurationAccX = t.parts.getFieldAccX("duration"); Util::FieldAccX partOffsetAccX = t.parts.getFieldAccX("offset"); size_t firstPart = origParts.getStartPos(); size_t endPart = origParts.getEndPos(); for (size_t i = firstPart; i < endPart; i++){ partSizeAccX.set(origPartSizeAccX.uint(i), i); partDurationAccX.set(origPartDurationAccX.uint(i), i); partOffsetAccX.set(origPartOffsetAccX.uint(i), i); } t.keys.setEndPos(origKeys.getEndPos()); t.keys.setStartPos(origKeys.getStartPos()); t.keys.setDeleted(origKeys.getDeleted()); t.keys.setPresent(origKeys.getPresent()); Util::FieldAccX origKeyFirstpartAccX = origKeys.getFieldAccX("firstpart"); Util::FieldAccX origKeyBposAccX = origKeys.getFieldAccX("bpos"); Util::FieldAccX origKeyDurationAccX = origKeys.getFieldAccX("duration"); Util::FieldAccX origKeyNumberAccX = origKeys.getFieldAccX("number"); Util::FieldAccX origKeyPartsAccX = origKeys.getFieldAccX("parts"); Util::FieldAccX origKeyTimeAccX = origKeys.getFieldAccX("time"); Util::FieldAccX origKeySizeAccX = origKeys.getFieldAccX("size"); Util::FieldAccX keyFirstpartAccX = t.keys.getFieldAccX("firstpart"); Util::FieldAccX keyBposAccX = t.keys.getFieldAccX("bpos"); Util::FieldAccX keyDurationAccX = t.keys.getFieldAccX("duration"); Util::FieldAccX keyNumberAccX = t.keys.getFieldAccX("number"); Util::FieldAccX keyPartsAccX = t.keys.getFieldAccX("parts"); Util::FieldAccX keyTimeAccX = t.keys.getFieldAccX("time"); Util::FieldAccX keySizeAccX = t.keys.getFieldAccX("size"); size_t firstKey = origKeys.getStartPos(); size_t endKey = origKeys.getEndPos(); for (size_t i = firstKey; i < endKey; i++){ keyFirstpartAccX.set(origKeyFirstpartAccX.uint(i), i); keyBposAccX.set(origKeyBposAccX.uint(i), i); keyDurationAccX.set(origKeyDurationAccX.uint(i), i); keyNumberAccX.set(origKeyNumberAccX.uint(i), i); keyPartsAccX.set(origKeyPartsAccX.uint(i), i); keyTimeAccX.set(origKeyTimeAccX.uint(i), i); keySizeAccX.set(origKeySizeAccX.uint(i), i); } t.fragments.setEndPos(origFragments.getEndPos()); t.fragments.setStartPos(origFragments.getStartPos()); t.fragments.setDeleted(origFragments.getDeleted()); t.fragments.setPresent(origFragments.getPresent()); Util::FieldAccX origFragmentDurationAccX = origFragments.getFieldAccX("duration"); Util::FieldAccX origFragmentKeysAccX = origFragments.getFieldAccX("keys"); Util::FieldAccX origFragmentFirstkeyAccX = origFragments.getFieldAccX("firstkey"); Util::FieldAccX origFragmentSizeAccX = origFragments.getFieldAccX("size"); Util::FieldAccX fragmentDurationAccX = t.fragments.getFieldAccX("duration"); Util::FieldAccX fragmentKeysAccX = t.fragments.getFieldAccX("keys"); Util::FieldAccX fragmentFirstkeyAccX = t.fragments.getFieldAccX("firstkey"); Util::FieldAccX fragmentSizeAccX = t.fragments.getFieldAccX("size"); size_t firstFragment = origFragments.getStartPos(); size_t endFragment = origFragments.getEndPos(); for (size_t i = firstFragment; i < endFragment; i++){ fragmentDurationAccX.set(origFragmentDurationAccX.uint(i), i); fragmentKeysAccX.set(origFragmentKeysAccX.uint(i), i); fragmentFirstkeyAccX.set(origFragmentFirstkeyAccX.uint(i), i); fragmentSizeAccX.set(origFragmentSizeAccX.uint(i), i); } t.pages.setEndPos(origPages.getEndPos()); t.pages.setStartPos(origPages.getStartPos()); t.pages.setDeleted(origPages.getDeleted()); t.pages.setPresent(origPages.getPresent()); Util::FieldAccX origPageFirstkeyAccX = origPages.getFieldAccX("firstkey"); Util::FieldAccX origPageKeycountAccX = origPages.getFieldAccX("keycount"); Util::FieldAccX origPagePartsAccX = origPages.getFieldAccX("parts"); Util::FieldAccX origPageSizeAccX = origPages.getFieldAccX("size"); Util::FieldAccX origPageAvailAccX = origPages.getFieldAccX("avail"); Util::FieldAccX origPageFirsttimeAccX = origPages.getFieldAccX("firsttime"); Util::FieldAccX origPageLastkeytimeAccX = origPages.getFieldAccX("lastkeytime"); Util::FieldAccX pageFirstkeyAccX = t.pages.getFieldAccX("firstkey"); Util::FieldAccX pageKeycountAccX = t.pages.getFieldAccX("keycount"); Util::FieldAccX pagePartsAccX = t.pages.getFieldAccX("parts"); Util::FieldAccX pageSizeAccX = t.pages.getFieldAccX("size"); Util::FieldAccX pageAvailAccX = t.pages.getFieldAccX("avail"); Util::FieldAccX pageFirsttimeAccX = t.pages.getFieldAccX("firsttime"); Util::FieldAccX pageLastkeytimeAccX = t.pages.getFieldAccX("lastkeytime"); size_t firstPage = origPages.getStartPos(); size_t endPage = origPages.getEndPos(); for (size_t i = firstPage; i < endPage; i++){ pageFirstkeyAccX.set(origPageFirstkeyAccX.uint(i), i); pageKeycountAccX.set(origPageKeycountAccX.uint(i), i); pagePartsAccX.set(origPagePartsAccX.uint(i), i); pageSizeAccX.set(origPageSizeAccX.uint(i), i); pageAvailAccX.set(origPageAvailAccX.uint(i), i); pageFirsttimeAccX.set(origPageFirsttimeAccX.uint(i), i); pageLastkeytimeAccX.set(origPageLastkeytimeAccX.uint(i), i); } t.track.setReady(); free(orig); resizeLock.unlink(); } size_t Meta::addDelayedTrack(size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount){ return addTrack(fragCount, keyCount, partCount, pageCount, false); } /// Adds a track to the metadata structure. /// To be called from the various inputs/outputs whenever they want to add a track. size_t Meta::addTrack(size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount, bool setValid){ char pageName[NAME_BUFFER_SIZE]; IPC::semaphore trackLock; if (!isMemBuf){ snprintf(pageName, NAME_BUFFER_SIZE, SEM_TRACKLIST, streamName.c_str()); trackLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!trackLock){ FAIL_MSG("Could not open semaphore to add track!"); return INVALID_TRACK_ID; } trackLock.wait(); if (stream.isExit()){ trackLock.post(); FAIL_MSG("Not adding track: stream is shutting down"); return INVALID_TRACK_ID; } } size_t pageSize = TRACK_TRACK_OFFSET + TRACK_TRACK_RECORDSIZE + (TRACK_FRAGMENT_OFFSET + (TRACK_FRAGMENT_RECORDSIZE * fragCount)) + (TRACK_KEY_OFFSET + (TRACK_KEY_RECORDSIZE * keyCount)) + (TRACK_PART_OFFSET + (TRACK_PART_RECORDSIZE * partCount)) + (TRACK_PAGE_OFFSET + (TRACK_PAGE_RECORDSIZE * pageCount)); size_t tNumber = trackList.getPresent(); snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_TM, streamName.c_str(), getpid(), tNumber); Track &t = tracks[tNumber]; if (isMemBuf){ tMemBuf[tNumber] = (char *)malloc(pageSize); sizeMemBuf[tNumber] = pageSize; memset(tMemBuf[tNumber], 0, pageSize); t.track = Util::RelAccX(tMemBuf[tNumber], false); }else{ tM[tNumber].init(pageName, pageSize, true); tM[tNumber].master = false; t.track = Util::RelAccX(tM[tNumber].mapped, false); } initializeTrack(t, fragCount, keyCount, partCount, pageCount); t.track.setReady(); trackList.setString(trackPageField, pageName, tNumber); trackList.setInt(trackPidField, getpid(), tNumber); trackList.setInt(trackSourceTidField, INVALID_TRACK_ID, tNumber); trackList.addRecords(1); if (setValid){validateTrack(tNumber, trackValidDefault);} if (!isMemBuf){trackLock.post();} return tNumber; } bool Meta::isClaimed(size_t trackIdx) const{ return (trackList.getInt(trackPidField, trackIdx) != 0); } void Meta::claimTrack(size_t trackIdx){ if (trackList.getInt(trackPidField, trackIdx) != 0){ FAIL_MSG("Cannot claim track: already claimed by PID %" PRIu64, trackList.getInt(trackPidField, trackIdx)); return; } trackList.setInt(trackPidField, getpid(), trackIdx); } void Meta::abandonTrack(size_t trackIdx){ if (trackList.getInt(trackPidField, trackIdx) != getpid()){ FAIL_MSG("Cannot abandon track: is claimed by PID %" PRIu64 ", not us", trackList.getInt(trackPidField, trackIdx)); return; } trackList.setInt(trackPidField, 0, trackIdx); } /// Internal function that is called whenever a track is (re)written to the memory structures. /// Adds the needed fields and sets all the RelAccXFieldData members to point to them. void Meta::initializeTrack(Track &t, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount){ t.track.addField("id", RAX_32UINT); t.track.addField("type", RAX_STRING, 8); t.track.addField("codec", RAX_STRING, 8); t.track.addField("firstms", RAX_64UINT); t.track.addField("lastms", RAX_64UINT); t.track.addField("bps", RAX_32UINT); t.track.addField("maxbps", RAX_32UINT); t.track.addField("lang", RAX_STRING, 4); t.track.addField("init", RAX_RAW, 1 * 1024 * 1024); // 1megabyte init data t.track.addField("rate", RAX_16UINT); t.track.addField("size", RAX_16UINT); t.track.addField("channels", RAX_16UINT); t.track.addField("width", RAX_32UINT); t.track.addField("height", RAX_32UINT); t.track.addField("fpks", RAX_16UINT); t.track.addField("missedFrags", RAX_32UINT); t.track.addField("parts", RAX_NESTED, TRACK_PART_OFFSET + (TRACK_PART_RECORDSIZE * partCount)); t.track.addField("keys", RAX_NESTED, TRACK_KEY_OFFSET + (TRACK_KEY_RECORDSIZE * keyCount)); t.track.addField("fragments", RAX_NESTED, TRACK_FRAGMENT_OFFSET + (TRACK_FRAGMENT_RECORDSIZE * fragCount)); t.track.addField("pages", RAX_NESTED, TRACK_PAGE_OFFSET + (TRACK_PAGE_RECORDSIZE * pageCount)); t.track.setRCount(1); t.track.addRecords(1); t.parts = Util::RelAccX(t.track.getPointer("parts"), false); t.parts.addField("size", RAX_32UINT); t.parts.addField("duration", RAX_16UINT); t.parts.addField("offset", RAX_16INT); t.parts.setRCount(partCount); t.parts.setReady(); t.keys = Util::RelAccX(t.track.getPointer("keys"), false); t.keys.addField("firstpart", RAX_64UINT); t.keys.addField("bpos", RAX_64UINT); t.keys.addField("duration", RAX_32UINT); t.keys.addField("number", RAX_32UINT); t.keys.addField("parts", RAX_32UINT); t.keys.addField("time", RAX_64UINT); t.keys.addField("size", RAX_32UINT); t.keys.setRCount(keyCount); t.keys.setReady(); t.fragments = Util::RelAccX(t.track.getPointer("fragments"), false); t.fragments.addField("duration", RAX_32UINT); t.fragments.addField("keys", RAX_16UINT); t.fragments.addField("firstkey", RAX_32UINT); t.fragments.addField("size", RAX_32UINT); t.fragments.setRCount(fragCount); t.fragments.setReady(); t.trackIdField = t.track.getFieldData("id"); t.trackTypeField = t.track.getFieldData("type"); t.trackCodecField = t.track.getFieldData("codec"); t.trackFirstmsField = t.track.getFieldData("firstms"); t.trackLastmsField = t.track.getFieldData("lastms"); t.trackBpsField = t.track.getFieldData("bps"); t.trackMaxbpsField = t.track.getFieldData("maxbps"); t.trackLangField = t.track.getFieldData("lang"); t.trackInitField = t.track.getFieldData("init"); t.trackRateField = t.track.getFieldData("rate"); t.trackSizeField = t.track.getFieldData("size"); t.trackChannelsField = t.track.getFieldData("channels"); t.trackWidthField = t.track.getFieldData("width"); t.trackHeightField = t.track.getFieldData("height"); t.trackFpksField = t.track.getFieldData("fpks"); t.trackMissedFragsField = t.track.getFieldData("missedFrags"); t.partSizeField = t.parts.getFieldData("size"); t.partDurationField = t.parts.getFieldData("duration"); t.partOffsetField = t.parts.getFieldData("offset"); t.keyFirstPartField = t.keys.getFieldData("firstpart"); t.keyBposField = t.keys.getFieldData("bpos"); t.keyDurationField = t.keys.getFieldData("duration"); t.keyNumberField = t.keys.getFieldData("number"); t.keyPartsField = t.keys.getFieldData("parts"); t.keyTimeField = t.keys.getFieldData("time"); t.keySizeField = t.keys.getFieldData("size"); t.fragmentDurationField = t.fragments.getFieldData("duration"); t.fragmentKeysField = t.fragments.getFieldData("keys"); t.fragmentFirstKeyField = t.fragments.getFieldData("firstkey"); t.fragmentSizeField = t.fragments.getFieldData("size"); t.pages = Util::RelAccX(t.track.getPointer("pages"), false); t.pages.addField("firstkey", RAX_32UINT); t.pages.addField("keycount", RAX_32UINT); t.pages.addField("parts", RAX_32UINT); t.pages.addField("size", RAX_32UINT); t.pages.addField("avail", RAX_32UINT); t.pages.addField("firsttime", RAX_64UINT); t.pages.addField("lastkeytime", RAX_64UINT); t.pages.setRCount(pageCount); t.pages.setReady(); } /// Sets the given track's init data. /// Simply calls setInit(size_t, const char *, size_t) using values from the referenced /// std::string. void Meta::setInit(size_t trackIdx, const std::string &init){ setInit(trackIdx, init.data(), init.size()); } /// Sets the given track's init data. void Meta::setInit(size_t trackIdx, const char *init, size_t initLen){ DTSC::Track &t = tracks.at(trackIdx); if (initLen > t.trackInitField.size){ FAIL_MSG("Attempting to store %zu bytes of init data, but we only have room for %" PRIu32 " bytes!", initLen, t.trackInitField.size); initLen = t.trackInitField.size; } char *_init = t.track.getPointer(t.trackInitField); Bit::htobs(_init, initLen); memcpy(_init + 2, init, initLen); } /// Retrieves the given track's init data as std::string. std::string Meta::getInit(size_t idx) const{ const DTSC::Track &t = tracks.at(idx); char *src = t.track.getPointer(t.trackInitField); uint16_t size = Bit::btohs(src); return std::string(src + 2, size); } void Meta::setSource(const std::string &src){stream.setString(streamSourceField, src);} std::string Meta::getSource() const{return stream.getPointer(streamSourceField);} void Meta::setID(size_t trackIdx, size_t id){ trackList.setInt(trackIdField, id, trackIdx); DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackIdField, id); } size_t Meta::getID(size_t trackIdx) const{return trackList.getInt(trackIdField, trackIdx);} /// Writes Util::bootSecs() to the track's last updated field. void Meta::markUpdated(size_t trackIdx){ trackList.setInt(trackLastUpdateField, Util::bootSecs(), trackIdx); } /// Reads the track's last updated field, which should be the Util::bootSecs() value of the time /// of last update. uint64_t Meta::getLastUpdated(size_t trackIdx) const{ return trackList.getInt(trackLastUpdateField, trackIdx); } /// Reads the most recently updated track last updated field, which should be the Util::bootSecs() /// value of the time of last update. uint64_t Meta::getLastUpdated() const{ uint64_t ret = 0; std::set validTracks = getValidTracks(); for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ uint64_t trackUp = getLastUpdated(*it); if (trackUp > ret){ret = trackUp;} } return ret; } void Meta::setChannels(size_t trackIdx, uint16_t channels){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackChannelsField, channels); } uint16_t Meta::getChannels(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackChannelsField); } void Meta::setWidth(size_t trackIdx, uint32_t width){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackWidthField, width); } uint32_t Meta::getWidth(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackWidthField); } void Meta::setHeight(size_t trackIdx, uint32_t height){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackHeightField, height); } uint32_t Meta::getHeight(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackHeightField); } void Meta::setRate(size_t trackIdx, uint32_t rate){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackRateField, rate); } uint32_t Meta::getRate(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackRateField); } void Meta::setSize(size_t trackIdx, uint16_t size){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackSizeField, size); } uint16_t Meta::getSize(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackSizeField); } void Meta::setType(size_t trackIdx, const std::string &type){ trackList.setString(trackTypeField, type, trackIdx); DTSC::Track &t = tracks.at(trackIdx); t.track.setString(t.trackTypeField, type); } std::string Meta::getType(size_t trackIdx) const{ return trackList.getPointer(trackTypeField, trackIdx); } void Meta::setCodec(size_t trackIdx, const std::string &codec){ trackList.setString(trackCodecField, codec, trackIdx); DTSC::Track &t = tracks.at(trackIdx); t.track.setString(t.trackCodecField, codec); } std::string Meta::getCodec(size_t trackIdx) const{ return trackList.getPointer(trackCodecField, trackIdx); } void Meta::setLang(size_t trackIdx, const std::string &lang){ DTSC::Track &t = tracks.at(trackIdx); t.track.setString(t.trackLangField, lang); } std::string Meta::getLang(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); if (!t.track.isReady()){return "";} return t.track.getPointer(t.trackLangField); } void Meta::setFirstms(size_t trackIdx, uint64_t firstms){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackFirstmsField, firstms); } uint64_t Meta::getFirstms(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackFirstmsField); } void Meta::setLastms(size_t trackIdx, uint64_t lastms){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackLastmsField, lastms); } uint64_t Meta::getLastms(size_t trackIdx) const{ const DTSC::Track &t = tracks.find(trackIdx)->second; return t.track.getInt(t.trackLastmsField); } uint64_t Meta::getDuration(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackLastmsField) - t.track.getInt(t.trackFirstmsField); } void Meta::setBps(size_t trackIdx, uint64_t bps){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackBpsField, bps); } uint64_t Meta::getBps(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackBpsField); } void Meta::setMaxBps(size_t trackIdx, uint64_t bps){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackMaxbpsField, bps); } uint64_t Meta::getMaxBps(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackMaxbpsField); } void Meta::setFpks(size_t trackIdx, uint64_t bps){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackFpksField, bps); } uint64_t Meta::getFpks(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackFpksField); } void Meta::setMissedFragments(size_t trackIdx, uint32_t bps){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackMissedFragsField, bps); } uint32_t Meta::getMissedFragments(size_t trackIdx) const{ const DTSC::Track &t = tracks.at(trackIdx); return t.track.getInt(t.trackMissedFragsField); } void Meta::setMinKeepAway(size_t trackIdx, uint64_t minKeepAway){ trackList.setInt(trackMinKeepAwayField, minKeepAway, trackIdx); } uint64_t Meta::getMinKeepAway(size_t trackIdx) const{ return trackList.getInt(trackMinKeepAwayField, trackIdx); } void Meta::setMaxKeepAway(uint64_t maxKeepAway){ stream.setInt(streamMaxKeepAwayField, maxKeepAway); } uint64_t Meta::getMaxKeepAway() const{ return stream.getInt(streamMaxKeepAwayField); } void Meta::setEncryption(size_t trackIdx, const std::string &encryption){ trackList.setString(trackEncryptionField, encryption, trackIdx); } std::string Meta::getEncryption(size_t trackIdx) const{ return trackList.getPointer(trackEncryptionField, trackIdx); } void Meta::setWidevine(size_t trackIdx, const std::string &widevine){ trackList.setString(trackWidevineField, widevine, trackIdx); } std::string Meta::getWidevine(size_t trackIdx) const{ return trackList.getPointer(trackWidevineField, trackIdx); } void Meta::setPlayReady(size_t trackIdx, const std::string &playReady){ trackList.setString(trackPlayreadyField, playReady, trackIdx); } std::string Meta::getPlayReady(size_t trackIdx) const{ return trackList.getPointer(trackPlayreadyField, trackIdx); } void Meta::setIvec(size_t trackIdx, uint64_t ivec){ trackList.setInt(trackIvecField, ivec, trackIdx); } uint64_t Meta::getIvec(size_t trackIdx) const{ return trackList.getInt(trackIvecField, trackIdx); } void Meta::setSourceTrack(size_t trackIdx, size_t sourceTrack){ trackList.setInt(trackSourceTidField, sourceTrack, trackIdx); } uint64_t Meta::getSourceTrack(size_t trackIdx) const{ return trackList.getInt(trackSourceTidField, trackIdx); } void Meta::setVod(bool vod){ stream.setInt(streamVodField, vod ? 1 : 0); } bool Meta::getVod() const{return stream.getInt(streamVodField);} void Meta::setLive(bool live){ stream.setInt(streamLiveField, live ? 1 : 0); } bool Meta::getLive() const{return stream.getInt(streamLiveField);} bool Meta::hasBFrames(size_t idx) const{ std::set vTracks = getValidTracks(); for (std::set::iterator it = vTracks.begin(); it != vTracks.end(); it++){ if (idx != INVALID_TRACK_ID && idx != *it){continue;} if (getType(*it) != "video"){continue;} DTSC::Parts p(parts(*it)); size_t ctr = 0; int64_t prevOffset = 0; bool firstOffset = true; for (size_t i = p.getFirstValid(); i < p.getEndValid(); ++i){ if (firstOffset){ firstOffset = false; prevOffset = p.getOffset(i); } if (p.getOffset(i) != prevOffset){return true;} if (++ctr >= 100){break;} } } return false; } void Meta::setBufferWindow(uint64_t bufferWindow){ stream.setInt(streamBufferWindowField, bufferWindow); } uint64_t Meta::getBufferWindow() const{return stream.getInt(streamBufferWindowField);} void Meta::setBootMsOffset(int64_t bootMsOffset){ DONTEVEN_MSG("Setting streamBootMsOffsetField to %" PRId64, bootMsOffset); stream.setInt(streamBootMsOffsetField, bootMsOffset); } int64_t Meta::getBootMsOffset() const{return stream.getInt(streamBootMsOffsetField);} void Meta::setUTCOffset(int64_t UTCOffset){ stream.setInt(streamUTCOffsetField, UTCOffset); } int64_t Meta::getUTCOffset() const{return stream.getInt(streamUTCOffsetField);} /*LTS-START*/ void Meta::setMinimumFragmentDuration(uint64_t fragmentDuration){ stream.setInt(streamMinimumFragmentDurationField, fragmentDuration); } uint64_t Meta::getMinimumFragmentDuration() const{ uint64_t res = stream.getInt(streamMinimumFragmentDurationField); if (res > 0){return res;} return DEFAULT_FRAGMENT_DURATION; } /*LTS-END*/ std::set Meta::getValidTracks(bool skipEmpty) const{ std::set res; if (!(*this) && !isMemBuf){ INFO_MSG("Shared metadata not ready yet - no tracks valid"); return res; } uint64_t firstValid = trackList.getDeleted(); uint64_t beyondLast = trackList.getEndPos(); for (size_t i = firstValid; i < beyondLast; i++){ if (trackList.getInt(trackValidField, i) & trackValidMask){res.insert(i);} if (trackList.getInt(trackSourceTidField, i) != INVALID_TRACK_ID && std::string(trackList.getPointer(trackEncryptionField, i)) != ""){ res.erase(trackList.getInt(trackSourceTidField, i)); } if (!tracks.count(i) || !tracks.at(i).track.isReady()){res.erase(i);} if (skipEmpty){ if (res.count(i) && !tracks.at(i).parts.getPresent()){res.erase(i);} } } return res; } std::set Meta::getMySourceTracks(size_t pid) const{ std::set res; if (!streamPage.mapped){return res;} uint64_t firstValid = trackList.getDeleted(); uint64_t beyondLast = firstValid + trackList.getPresent(); for (size_t i = firstValid; i < beyondLast; i++){ if (trackList.getInt(trackValidField, i) && trackList.getInt(trackPidField, i) == pid){ res.insert(i); } } return res; } /// Sets the track valid field to 1, also calling markUpdated() void Meta::validateTrack(size_t trackIdx, uint8_t validType){ markUpdated(trackIdx); trackList.setInt(trackValidField, validType, trackIdx); } void Meta::removeEmptyTracks(){ reloadReplacedPagesIfNeeded(); std::set validTracks = getValidTracks(); for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ if (!tracks.at(*it).parts.getPresent()){removeTrack(*it);} } } /// Removes the track from the memory structure and caches. void Meta::removeTrack(size_t trackIdx){ if (!getValidTracks().count(trackIdx)){return;} Track &t = tracks[trackIdx]; for (uint64_t i = t.pages.getDeleted(); i < t.pages.getEndPos(); i++){ if (t.pages.getInt("avail", i) == 0){continue;} char thisPageName[NAME_BUFFER_SIZE]; snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx, (uint32_t)t.pages.getInt("firstkey", i)); IPC::sharedPage p(thisPageName, 20971520); p.master = true; } tM[trackIdx].master = true; tM.erase(trackIdx); tracks.erase(trackIdx); trackList.setInt(trackValidField, 0, trackIdx); } /// Removes the first key from the memory structure and caches. bool Meta::removeFirstKey(size_t trackIdx){ IPC::semaphore resizeLock; if (!isMemBuf){ const char * pageName = trackList.getPointer(trackPageField, trackIdx); resizeLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!resizeLock.tryWait()){ MEDIUM_MSG("Metadata is busy, delaying deletion of key a bit"); resizeLock.close(); return false; } if (reloadReplacedPagesIfNeeded()){ MEDIUM_MSG("Metadata just got replaced, delaying deletion of key a bit"); return false; } } Track &t = tracks[trackIdx]; DONTEVEN_MSG("Deleting parts: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.parts.getDeleted(), t.parts.getDeleted()+t.keys.getInt(t.keyPartsField, t.keys.getDeleted()), t.parts.getPresent()); t.parts.deleteRecords(t.keys.getInt(t.keyPartsField, t.keys.getDeleted())); DONTEVEN_MSG("Deleting key: %" PRIu64 "->%" PRIu64 " del'd, %zu pres", t.keys.getDeleted(), t.keys.getDeleted()+1, t.keys.getPresent()); t.keys.deleteRecords(1); if (t.fragments.getInt(t.fragmentFirstKeyField, t.fragments.getDeleted()) < t.keys.getDeleted()){ t.fragments.deleteRecords(1); setMissedFragments(trackIdx, getMissedFragments(trackIdx) + 1); } if (t.pages.getPresent() > 1 && t.pages.getInt("firstkey", t.pages.getDeleted() + 1) < t.keys.getDeleted()){ // Initialize the correct page, make it master so it gets cleaned up when leaving scope. char thisPageName[NAME_BUFFER_SIZE]; snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx, (uint32_t)t.pages.getInt("firstkey", t.pages.getDeleted())); IPC::sharedPage p(thisPageName, 20971520); p.master = true; // Then delete the page entry t.pages.deleteRecords(1); } setFirstms(trackIdx, t.keys.getInt(t.keyTimeField, t.keys.getDeleted())); if (resizeLock){resizeLock.unlink();} return true; } ///\brief Updates a meta object given a DTSC::Packet with byte position override. void Meta::updatePosOverride(DTSC::Packet &pack, uint64_t bpos){ char *data; size_t dataLen; pack.getString("data", data, dataLen); update(pack.getTime(), pack.getInt("offset"), pack.getTrackId(), dataLen, bpos, pack.getFlag("keyframe"), pack.getDataLen()); } ///\brief Updates a meta object given a DTSC::Packet void Meta::update(const DTSC::Packet &pack){ char *data; size_t dataLen; pack.getString("data", data, dataLen); update(pack.getTime(), pack.getInt("offset"), pack.getTrackId(), dataLen, pack.getInt("bpos"), pack.getFlag("keyframe"), pack.getDataLen()); } /// Helper class that calculates inter-packet jitter class jitterTimer{ public: uint64_t trueTime[8]; // Array of bootMS-based measurement points uint64_t packTime[8]; // Array of corresponding packet times uint64_t curJitter; // Maximum jitter measurement in past 10 seconds unsigned int x; // Current indice within above two arrays uint64_t maxJitter; // Highest jitter ever observed by this jitterTimer uint64_t lastTime; // Last packet used for a measurement point jitterTimer(){ for (int i = 0; i < 8; ++i){ trueTime[i] = 0; packTime[i] = 0; } maxJitter = 200; lastTime = 0; x = 0; } uint64_t addPack(uint64_t t){ if (veryUglyJitterOverride){return veryUglyJitterOverride;} uint64_t curMs = Util::bootMS(); if (!x){ // First call, set the whole array to this packet for (int i = 0; i < 8; ++i){ trueTime[i] = curMs; packTime[i] = t; } ++x; trueTime[x % 8] = curMs; packTime[x % 8] = t; lastTime = t; curJitter = 0; } if (t > lastTime + 2500){ if ((x % 4) == 0){ if (maxJitter > 50 && curJitter < maxJitter - 50){ MEDIUM_MSG("Jitter lowered from %" PRIu64 " to %" PRIu64 " ms", maxJitter, curJitter); maxJitter = curJitter; } curJitter = maxJitter*0.90; } ++x; trueTime[x % 8] = curMs; packTime[x % 8] = t; lastTime = t; } uint64_t realTime = (curMs - trueTime[(x + 1) % 8]); uint64_t arriTime = (t - packTime[(x + 1) % 8]); int64_t jitter = (realTime - arriTime); if (jitter < 0){ // Negative jitter = packets arriving too soon. // This is... ehh... not a bad thing? I guess..? // if (jitter < -1000){ // INFO_MSG("Jitter = %" PRId64 " ms (max: %" PRIu64 ")", jitter, maxJitter); //} }else{ // Positive jitter = packets arriving too late. // We need to delay playback at least by this amount to account for it. if ((uint64_t)jitter > maxJitter){ if (jitter - maxJitter > 420){ INFO_MSG("Jitter increased from %" PRIu64 " to %" PRId64 " ms", maxJitter, jitter); }else{ HIGH_MSG("Jitter increased from %" PRIu64 " to %" PRId64 " ms", maxJitter, jitter); } maxJitter = (uint64_t)jitter; } if (curJitter < (uint64_t)jitter){curJitter = (uint64_t)jitter;} } return maxJitter; } }; /// Updates the metadata given the packet's properties. void Meta::update(uint64_t packTime, int64_t packOffset, uint32_t packTrack, uint64_t packDataSize, uint64_t packBytePos, bool isKeyframe, uint64_t packSendSize){ ///\todo warning Re-Implement Ivec if (getLive()){ static std::map theJitters; setMinKeepAway(packTrack, theJitters[packTrack].addPack(packTime)); } DONTEVEN_MSG("Updating meta with: t=%" PRIu64 ", o=%" PRId64 ", s=%" PRIu64 ", t=%" PRIu32 ", p=%" PRIu64, packTime, packOffset, packDataSize, packTrack, packBytePos); if (!packSendSize){ // time and trackID are part of the 20-byte header. // the container object adds 4 bytes (plus 2+namelen for each content, see below) // offset, if non-zero, adds 9 bytes (integer type) and 8 bytes (2+namelen) // bpos, if >= 0, adds 9 bytes (integer type) and 6 bytes (2+namelen) // keyframe, if true, adds 9 bytes (integer type) and 10 bytes (2+namelen) // data adds packDataSize+5 bytes (string type) and 6 bytes (2+namelen) packSendSize = 24 + (packOffset ? 17 : 0) + (packBytePos > 0 ? 15 : 0) + (isKeyframe ? 19 : 0) + packDataSize + 11; } if ((packBytePos > 0) && !getVod()){setVod(true);} size_t tNumber = packTrack; std::map::iterator it = tracks.find(tNumber); if (it == tracks.end()){ ERROR_MSG("Could not buffer packet for track %zu: track not found", tNumber); return; } Track &t = it->second; if (packTime < getLastms(tNumber)){ static bool warned = false; if (!warned){ ERROR_MSG("Received packets for track %zu in wrong order (%" PRIu64 " < %" PRIu64 ") - ignoring! Further messages on HIGH level.", tNumber, packTime, getLastms(tNumber)); warned = true; }else{ HIGH_MSG("Received packets for track %zu in wrong order (%" PRIu64 " < %" PRIu64 ") - ignoring!", tNumber, packTime, getLastms(tNumber)); } return; } uint64_t newPartNum = t.parts.getEndPos(); if ((newPartNum - t.parts.getDeleted()) >= t.parts.getRCount()){ resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount(), t.parts.getRCount() * 2, t.pages.getRCount(), "not enough parts"); } t.parts.setInt(t.partSizeField, packDataSize, newPartNum); t.parts.setInt(t.partOffsetField, packOffset, newPartNum); if (newPartNum){ t.parts.setInt(t.partDurationField, packTime - getLastms(tNumber), newPartNum - 1); t.parts.setInt(t.partDurationField, packTime - getLastms(tNumber), newPartNum); }else{ t.parts.setInt(t.partDurationField, 0, newPartNum); setFirstms(tNumber, packTime); } t.parts.addRecords(1); uint64_t newKeyNum = t.keys.getEndPos(); if (isKeyframe || newKeyNum == 0 || (getType(tNumber) != "video" && packTime >= AUDIO_KEY_INTERVAL && packTime - t.keys.getInt(t.keyTimeField, newKeyNum - 1) >= AUDIO_KEY_INTERVAL)){ if ((newKeyNum - t.keys.getDeleted()) >= t.keys.getRCount()){ resizeTrack(tNumber, t.fragments.getRCount(), t.keys.getRCount() * 2, t.parts.getRCount(), t.pages.getRCount(), "not enough keys"); } t.keys.setInt(t.keyBposField, packBytePos, newKeyNum); t.keys.setInt(t.keyTimeField, packTime, newKeyNum); t.keys.setInt(t.keyPartsField, 0, newKeyNum); t.keys.setInt(t.keyDurationField, 0, newKeyNum); t.keys.setInt(t.keySizeField, 0, newKeyNum); t.keys.setInt(t.keyNumberField, newKeyNum, newKeyNum); if (newKeyNum){ t.keys.setInt(t.keyFirstPartField, t.keys.getInt(t.keyFirstPartField, newKeyNum - 1) + t.keys.getInt(t.keyPartsField, newKeyNum - 1), newKeyNum); // Update duration of previous key too t.keys.setInt(t.keyDurationField, packTime - t.keys.getInt(t.keyTimeField, newKeyNum - 1), newKeyNum - 1); }else{ t.keys.setInt(t.keyFirstPartField, 0, newKeyNum); } t.keys.addRecords(1); t.track.setInt(t.trackFirstmsField, t.keys.getInt(t.keyTimeField, t.keys.getDeleted())); uint64_t newFragNum = t.fragments.getEndPos(); if (newFragNum == 0 || (packTime >= getMinimumFragmentDuration() && (packTime - getMinimumFragmentDuration()) >= t.keys.getInt(t.keyTimeField, t.fragments.getInt(t.fragmentFirstKeyField, newFragNum - 1)))){ if ((newFragNum - t.fragments.getDeleted()) >= t.fragments.getRCount()){ resizeTrack(tNumber, t.fragments.getRCount() * 2, t.keys.getRCount(), t.parts.getRCount(), t.pages.getRCount(), "not enough frags"); } if (newFragNum){ t.fragments.setInt(t.fragmentDurationField, packTime - t.keys.getInt(t.keyTimeField, t.fragments.getInt(t.fragmentFirstKeyField, newFragNum - 1)), newFragNum - 1); uint64_t totalBytes = 0; uint64_t totalDuration = 0; for (size_t fragIdx = t.fragments.getStartPos(); fragIdx < newFragNum; fragIdx++){ totalBytes += t.fragments.getInt(t.fragmentSizeField, fragIdx); totalDuration += t.fragments.getInt(t.fragmentDurationField, fragIdx); } setBps(tNumber, (totalDuration ? (totalBytes * 1000) / totalDuration : 0)); setMaxBps(tNumber, std::max(getMaxBps(tNumber), (t.fragments.getInt(t.fragmentSizeField, newFragNum - 1) * 1000) / t.fragments.getInt(t.fragmentDurationField, newFragNum - 1))); } t.fragments.setInt(t.fragmentFirstKeyField, newKeyNum, newFragNum); t.fragments.setInt(t.fragmentDurationField, 0, newFragNum); t.fragments.setInt(t.fragmentSizeField, 0, newFragNum); t.fragments.setInt(t.fragmentKeysField, 1, newFragNum); t.fragments.setInt(t.fragmentFirstKeyField, t.keys.getInt(t.keyNumberField, newKeyNum), newFragNum); t.fragments.addRecords(1); }else{ t.fragments.setInt(t.fragmentKeysField, t.fragments.getInt(t.fragmentKeysField, newFragNum - 1) + 1, newFragNum - 1); } }else{ uint64_t lastKeyNum = t.keys.getEndPos() - 1; t.keys.setInt(t.keyDurationField, t.keys.getInt(t.keyDurationField, lastKeyNum) + t.parts.getInt(t.partDurationField, newPartNum - 1), lastKeyNum); } uint64_t lastKeyNum = t.keys.getEndPos() - 1; t.keys.setInt(t.keyPartsField, t.keys.getInt(t.keyPartsField, lastKeyNum) + 1, lastKeyNum); t.keys.setInt(t.keySizeField, t.keys.getInt(t.keySizeField, lastKeyNum) + packSendSize, lastKeyNum); uint64_t lastFragNum = t.fragments.getEndPos() - 1; t.fragments.setInt(t.fragmentSizeField, t.fragments.getInt(t.fragmentSizeField, lastFragNum) + packDataSize, lastFragNum); t.track.setInt(t.trackLastmsField, packTime); markUpdated(tNumber); } /// Prints the metadata and tracks in human-readable format std::string Meta::toPrettyString() const{ std::stringstream r; r << "Metadata for stream " << streamName << std::endl; r << stream.toPrettyString(); for (std::map::const_iterator it = tracks.begin(); it != tracks.end(); it++){ r << " Track " << it->first << ": " << it->second.track.toPrettyString() << std::endl; } return r.str(); } /// Loops over the active tracks, returning the index of the track with the given ID for the given /// process. size_t Meta::trackIDToIndex(size_t trackID, size_t pid) const{ for (size_t i = 0; i < trackList.getPresent(); i++){ if (pid && trackList.getInt(trackPidField, i) != pid){continue;} if (trackList.getInt(trackIdField, i) == trackID){return i;} } return INVALID_TRACK_ID; } /// Returns a pretty-printed (optionally unique) name for the given track std::string Meta::getTrackIdentifier(size_t idx, bool unique) const{ std::stringstream result; std::string type = getType(idx); if (type == ""){ result << "metadata_" << idx; return result.str(); } result << type << "_"; result << getCodec(idx) << "_"; if (type == "audio"){ result << getChannels(idx) << "ch_"; result << getRate(idx) << "hz"; }else if (type == "video"){ result << getWidth(idx) << "x" << getHeight(idx) << "_"; result << (double)getFpks(idx) / 1000 << "fps"; } if (getLang(idx) != "" && getLang(idx) != "und"){result << "_" << getLang(idx);} if (unique){result << "_" << idx;} return result.str(); } const Util::RelAccX &Meta::parts(size_t idx) const{return tracks.at(idx).parts;} Util::RelAccX &Meta::keys(size_t idx){return tracks.at(idx).keys;} const Util::RelAccX &Meta::keys(size_t idx) const{return tracks.at(idx).keys;} const Util::RelAccX &Meta::fragments(size_t idx) const{return tracks.at(idx).fragments;} const Util::RelAccX &Meta::pages(size_t idx) const{return tracks.at(idx).pages;} Util::RelAccX &Meta::pages(size_t idx){return tracks.at(idx).pages;} /// Wipes internal structures, also marking as outdated and deleting memory structures if in /// master mode. void Meta::clear(){ if (isMemBuf){ isMemBuf = false; free(streamMemBuf); streamMemBuf = 0; for (std::map::iterator it = tMemBuf.begin(); it != tMemBuf.end(); it++){ free(it->second); } tMemBuf.clear(); sizeMemBuf.clear(); }else if (isMaster){ IPC::semaphore trackLock; if (streamName.size()){ char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SEM_TRACKLIST, streamName.c_str()); trackLock.open(pageName, O_CREAT|O_RDWR, ACCESSPERMS, 1); trackLock.tryWaitOneSecond(); } std::set toRemove; for (std::map::iterator it = tM.begin(); it != tM.end(); it++){ if (!it->second.mapped){continue;} toRemove.insert(it->first); } for (std::set::iterator it = toRemove.begin(); it != toRemove.end(); it++){ removeTrack(*it); } if (streamPage.mapped && stream.isReady()){stream.setExit();} streamPage.master = true; if (streamName.size()){ //Wipe tracklist semaphore. This is not done anywhere else in the codebase. trackLock.unlink(); } } stream = Util::RelAccX(); trackList = Util::RelAccX(); streamPage.close(); tM.clear(); tracks.clear(); isMaster = true; streamName = ""; } /// Makes a minimally-sized copy of the given Meta object. /// Only used internally by the remap() function. void Meta::minimalFrom(const DTSC::Meta &src){ clear(); sBufMem(); streamInit(); stream.flowFrom(src.stream); for (int i = 0; i < src.trackList.getPresent(); i++){ Track &t = tracks[i]; tMemBuf[i] = (char *)malloc(SHM_STREAM_TRACK_LEN); sizeMemBuf[i] = SHM_STREAM_TRACK_LEN; memset(tMemBuf[i], 0, SHM_STREAM_TRACK_LEN); t.track = Util::RelAccX(tMemBuf[i], false); initializeTrack(t); t.track.flowFrom(src.tracks.at(i).track); t.track.setReady(); } } /// Re-maps the current Meta object by making a minimal copy in a temporary object, then flowing /// the current object from the temporary object. Not currently used by anything...? void Meta::remap(const std::string &_streamName){ Meta M; M.minimalFrom(*this); reInit(_streamName.size() ? _streamName : streamName); stream.flowFrom(M.stream); for (size_t i = 0; i < M.trackList.getPresent(); i++){ Track &t = tracks[i]; char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_TM, streamName.c_str(), getpid(), i); tM[i].init(pageName, SHM_STREAM_TRACK_LEN, true); tM[i].master = false; t.track = Util::RelAccX(tM[i].mapped, false); initializeTrack(t); t.track.flowFrom(M.tracks[i].track); t.track.setReady(); } } ///\brief Determines the "packed" size of a Meta object uint64_t Meta::getSendLen(bool skipDynamic, std::set selectedTracks) const{ uint64_t dataLen = 34; // + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; if (getVod()){dataLen += 14;} if (getLive()){dataLen += 15 + 19;} // 19 for unixzero for (std::map::const_iterator it = tracks.begin(); it != tracks.end(); it++){ if (!it->second.parts.getPresent()){continue;} if (!selectedTracks.size() || selectedTracks.count(it->first)){ dataLen += (124 + getInit(it->first).size() + getCodec(it->first).size() + getType(it->first).size() + getTrackIdentifier(it->first, true).size()); if (!skipDynamic){ dataLen += ((it->second.fragments.getPresent() * DTSH_FRAGMENT_SIZE) + 16); dataLen += ((it->second.keys.getPresent() * DTSH_KEY_SIZE) + 11); dataLen += ((it->second.keys.getPresent() * 4) + 15); dataLen += ((it->second.parts.getPresent() * DTSH_PART_SIZE) + 12); // dataLen += ivecs.size() * 8 + 12; /*LTS*/ if (it->second.track.getInt("missedFrags")){dataLen += 23;} } std::string lang = getLang(it->first); if (lang.size() && lang != "und"){dataLen += 11 + lang.size();} if (getType(it->first) == "audio"){ dataLen += 49; }else if (getType(it->first) == "video"){ dataLen += 48; } } } /* if (sourceURI.size()){ dataLen += 13 + sourceURI.size(); } */ return dataLen + 8; // add 8 bytes header } ///\brief Converts a short to a char* inline char *c16(short input){ static char result[2]; Bit::htobs(result, input); return result; } ///\brief Converts a short to a char* inline char *c24(int input){ static char result[3]; Bit::htob24(result, input); return result; } ///\brief Converts an integer to a char* inline char *c32(int input){ static char result[4]; Bit::htobl(result, input); return result; } ///\brief Converts a long long to a char* inline char *c64(long long int input){ static char result[8]; Bit::htobll(result, input); return result; } /// Converts the current Meta object to JSON format void Meta::toJSON(JSON::Value &res, bool skipDynamic, bool tracksOnly) const{ res.null(); if (!skipDynamic){ WARN_MSG("Skipping dynamic stuff even though skipDynamic is set to false"); } uint64_t jitter = 0; bool bframes = false; std::set validTracks = getValidTracks(); for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ JSON::Value &trackJSON = res["tracks"][getTrackIdentifier(*it, true)]; std::string type = getType(*it); trackJSON["codec"] = getCodec(*it); trackJSON["codecstring"] = Util::codecString(getCodec(*it), getInit(*it)); trackJSON["type"] = type; trackJSON["idx"] = (uint64_t)*it; trackJSON["trackid"] = (uint64_t)getID(*it); trackJSON["init"] = getInit(*it); trackJSON["firstms"] = getFirstms(*it); trackJSON["lastms"] = getLastms(*it); trackJSON["bps"] = getBps(*it); trackJSON["maxbps"] = getMaxBps(*it); if (!skipDynamic && getLive()){ if (getMissedFragments(*it)){trackJSON["missed_frags"] = getMissedFragments(*it);} } uint64_t trkJitter = getMinKeepAway(*it); if (trkJitter){ trackJSON["jitter"] = trkJitter; if (trkJitter > jitter){jitter = trkJitter;} } if (getLang(*it) != "" && getLang(*it) != "und"){trackJSON["lang"] = getLang(*it);} if (type == "audio"){ trackJSON["rate"] = getRate(*it); trackJSON["size"] = getSize(*it); trackJSON["channels"] = getChannels(*it); }else if (type == "video"){ trackJSON["width"] = getWidth(*it); trackJSON["height"] = getHeight(*it); trackJSON["fpks"] = getFpks(*it); if (hasBFrames(*it)){ bframes = true; trackJSON["bframes"] = 1; } } } if (tracksOnly){ JSON::Value v = res["tracks"]; res = v; return; } if (jitter){res["jitter"] = jitter;} res["bframes"] = bframes?1:0; if (getMaxKeepAway()){res["maxkeepaway"] = getMaxKeepAway();} if (getLive()){ res["live"] = 1u; }else{ res["vod"] = 1u; } res["version"] = DTSH_VERSION; if (getBufferWindow()){res["buffer_window"] = getBufferWindow();} if (getSource() != ""){res["source"] = getSource();} } /// Writes the current Meta object in DTSH format to the given uri void Meta::toFile(const std::string &uri) const{ // Create writing socket int outFd = -1; if (!Util::externalWriter(uri, outFd, false)){return;} Socket::Connection outFile(outFd, -1); if (outFile){ send(outFile, false, getValidTracks(), false); outFile.close(); } } /// Sends the current Meta object through a socket in DTSH format void Meta::send(Socket::Connection &conn, bool skipDynamic, std::set selectedTracks, bool reID) const{ std::string lVars; size_t lVarSize = 0; if (inputLocalVars.size()){ lVars = inputLocalVars.toString(); lVarSize = 2 + 14 + 5 + lVars.size(); } conn.SendNow(DTSC::Magic_Header, 4); conn.SendNow(c32(lVarSize + getSendLen(skipDynamic, selectedTracks) - 8), 4); conn.SendNow("\340", 1); if (getVod()){conn.SendNow("\000\003vod\001\000\000\000\000\000\000\000\001", 14);} if (getLive()){conn.SendNow("\000\004live\001\000\000\000\000\000\000\000\001", 15);} conn.SendNow("\000\007version\001", 10); conn.SendNow(c64(DTSH_VERSION), 8); if (getLive()){ conn.SendNow("\000\010unixzero\001", 11); conn.SendNow(c64(Util::unixMS() - Util::bootMS() + getBootMsOffset()), 8); } if (lVarSize){ conn.SendNow("\000\016inputLocalVars\002", 17); conn.SendNow(c32(lVars.size()), 4); conn.SendNow(lVars.data(), lVars.size()); } conn.SendNow("\000\006tracks\340", 9); for (std::set::const_iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ std::string tmp = getTrackIdentifier(*it, true); conn.SendNow(c16(tmp.size()), 2); conn.SendNow(tmp.data(), tmp.size()); conn.SendNow("\340", 1); // Begin track object if (!skipDynamic){ const Util::RelAccX &fragments = tracks.at(*it).fragments; const Util::RelAccX &keys = tracks.at(*it).keys; const Util::RelAccX &parts = tracks.at(*it).parts; size_t fragBegin = fragments.getStartPos(); size_t fragCount = fragments.getPresent(); size_t keyBegin = keys.getStartPos(); size_t keyCount = keys.getPresent(); size_t partBegin = parts.getStartPos(); size_t partCount = parts.getPresent(); conn.SendNow("\000\011fragments\002", 12); conn.SendNow(c32(fragCount * DTSH_FRAGMENT_SIZE), 4); for (size_t i = 0; i < fragCount; i++){ conn.SendNow(c32(fragments.getInt("duration", i + fragBegin)), 4); conn.SendNow(std::string(1, (char)fragments.getInt("keys", i + fragBegin))); conn.SendNow(c32(fragments.getInt("firstkey", i + fragBegin) + 1), 4); conn.SendNow(c32(fragments.getInt("size", i + fragBegin)), 4); } conn.SendNow("\000\004keys\002", 7); conn.SendNow(c32(keyCount * DTSH_KEY_SIZE), 4); for (size_t i = 0; i < keyCount; i++){ conn.SendNow(c64(keys.getInt("bpos", i + fragBegin)), 8); conn.SendNow(c24(keys.getInt("duration", i + keyBegin)), 3); conn.SendNow(c32(keys.getInt("number", i + keyBegin)), 4); conn.SendNow(c16(keys.getInt("parts", i + keyBegin)), 2); conn.SendNow(c64(keys.getInt("time", i + keyBegin)), 8); } conn.SendNow("\000\010keysizes\002,", 11); conn.SendNow(c32(keyCount * 4), 4); for (size_t i = 0; i < keyCount; i++){ conn.SendNow(c32(keys.getInt("size", i + keyBegin)), 4); } conn.SendNow("\000\005parts\002", 8); conn.SendNow(c32(partCount * DTSH_PART_SIZE), 4); for (size_t i = 0; i < partCount; i++){ conn.SendNow(c24(parts.getInt("size", i + partBegin)), 3); conn.SendNow(c24(parts.getInt("duration", i + partBegin)), 3); conn.SendNow(c24(parts.getInt("offset", i + partBegin)), 3); } } const Util::RelAccX &track = tracks.at(*it).track; conn.SendNow("\000\007trackid\001", 10); if (reID){ conn.SendNow(c64((*it) + 1), 8); }else{ conn.SendNow(c64(track.getInt("id")), 8); } if (!skipDynamic && track.getInt("missedFrags")){ conn.SendNow("\000\014missed_frags\001", 15); conn.SendNow(c64(track.getInt("missedFrags")), 8); } conn.SendNow("\000\007firstms\001", 10); conn.SendNow(c64(track.getInt("firstms")), 8); conn.SendNow("\000\006lastms\001", 9); conn.SendNow(c64(track.getInt("lastms")), 8); conn.SendNow("\000\003bps\001", 6); conn.SendNow(c64(track.getInt("bps")), 8); conn.SendNow("\000\006maxbps\001", 9); conn.SendNow(c64(track.getInt("maxbps")), 8); tmp = getInit(*it); conn.SendNow("\000\004init\002", 7); conn.SendNow(c32(tmp.size()), 4); conn.SendNow(tmp.data(), tmp.size()); tmp = getCodec(*it); conn.SendNow("\000\005codec\002", 8); conn.SendNow(c32(tmp.size()), 4); conn.SendNow(tmp.data(), tmp.size()); tmp = getLang(*it); if (tmp.size() && tmp != "und"){ conn.SendNow("\000\004lang\002", 7); conn.SendNow(c32(tmp.size()), 4); conn.SendNow(tmp.data(), tmp.size()); } tmp = getType(*it); conn.SendNow("\000\004type\002", 7); conn.SendNow(c32(tmp.size()), 4); conn.SendNow(tmp.data(), tmp.size()); if (tmp == "audio"){ conn.SendNow("\000\004rate\001", 7); conn.SendNow(c64(track.getInt("rate")), 8); conn.SendNow("\000\004size\001", 7); conn.SendNow(c64(track.getInt("size")), 8); conn.SendNow("\000\010channels\001", 11); conn.SendNow(c64(track.getInt("channels")), 8); }else if (tmp == "video"){ conn.SendNow("\000\005width\001", 8); conn.SendNow(c64(track.getInt("width")), 8); conn.SendNow("\000\006height\001", 9); conn.SendNow(c64(track.getInt("height")), 8); conn.SendNow("\000\004fpks\001", 7); conn.SendNow(c64(track.getInt("fpks")), 8); } conn.SendNow("\000\000\356", 3); // End this track Object } conn.SendNow("\000\000\356", 3); // End tracks object conn.SendNow("\000\000\356", 3); // End global object } /// Returns true if the given track index is marked as valid and present in the tracks structure. bool Meta::trackLoaded(size_t idx) const{ if (!trackValid(idx)){return false;} if (!tracks.count(idx)){ INFO_MSG("Track %zu is not yet loaded", idx); return false; } return true; } /// Returns true if the given track index is marked as valid. For this the track does not have to /// be loaded as well uint8_t Meta::trackValid(size_t idx) const{ if (idx > trackList.getPresent()){return 0;} return trackList.getInt(trackValidField, idx); } /// Returns the current highest track index (zero-based). size_t Meta::trackCount() const{return trackList.getPresent();} /// Returns the index the first video track, or the first track. /// Will print a WARN-level message if there are no tracks. size_t Meta::mainTrack() const{ if (!trackList.getPresent()){return INVALID_TRACK_ID;} std::set validTracks = getValidTracks(); for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ if (getType(*it) == "video"){return *it;} } return *validTracks.begin(); } /// Returns the duration of the longest fragment in the given track. uint32_t Meta::biggestFragment(uint32_t idx) const{ if (!trackList.getPresent()){return 0;} uint32_t trackIdx = (idx == INVALID_TRACK_ID ? mainTrack() : idx); if (!tM.count(trackIdx)){return 0;} DTSC::Fragments fragments(tracks.at(trackIdx).fragments); uint64_t firstFragment = fragments.getFirstValid(); uint64_t endFragment = fragments.getEndValid(); uint32_t ret = 0; for (uint64_t i = firstFragment; i < endFragment; i++){ uint32_t fragDur = fragments.getDuration(i); if (fragDur > ret){ret = fragDur;} } return ret; } bool Meta::tracksAlign(size_t idx1, size_t idx2) const{ if (!tM.count(idx1) || !tM.count(idx2)){return false;} DTSC::Fragments frag1(tracks.at(idx1).fragments); DTSC::Fragments frag2(tracks.at(idx2).fragments); if (frag1.getFirstValid() >= frag2.getFirstValid()){ size_t firstValid = frag1.getFirstValid(); size_t firstTime = getTimeForFragmentIndex(idx1, firstValid); size_t secondIndex = getFragmentIndexForTime(idx2, firstTime); size_t count = std::min(frag1.getValidCount(), frag2.getEndValid() - secondIndex); if (count <= 2){ INFO_MSG("Determining track alignment between track %zu and %zu based on %zu fragments, " "might be inaccurate", idx1, idx2, count); } for (size_t i = 0; i < count; i++){ if (getTimeForFragmentIndex(idx1, firstValid + i) != getTimeForFragmentIndex(idx2, secondIndex + i)){ return false; } } }else{ size_t firstValid = frag2.getFirstValid(); size_t firstTime = getTimeForFragmentIndex(idx2, firstValid); size_t secondIndex = getFragmentIndexForTime(idx1, firstTime); size_t count = std::min(frag2.getValidCount(), frag1.getEndValid() - secondIndex); if (count <= 2){ INFO_MSG("Determining track alignment between track %zu and %zu based on %zu fragments, " "might be inaccurate", idx1, idx2, count); } for (size_t i = 0; i < count; i++){ if (getTimeForFragmentIndex(idx2, firstValid + i) != getTimeForFragmentIndex(idx1, secondIndex + i)){ return false; } } } return true; } /// Gets indice of the fragment containing timestamp, or last fragment if nowhere. uint32_t Meta::getFragmentIndexForTime(uint32_t idx, uint64_t timestamp) const{ DTSC::Fragments fragments(tracks.at(idx).fragments); DTSC::Keys keys(tracks.at(idx).keys); uint32_t firstFragment = fragments.getFirstValid(); uint32_t endFragment = fragments.getEndValid(); for (size_t i = firstFragment; i < endFragment; i++){ uint32_t keyNumber = fragments.getFirstKey(i); uint32_t duration = fragments.getDuration(i); if (timestamp < keys.getTime(keyNumber) + duration){return i;} } if (endFragment > firstFragment){ if (timestamp < getLastms(idx)){return endFragment - 1;} } return endFragment; } /// Returns the timestamp for the given key index in the given track index uint64_t Meta::getTimeForKeyIndex(uint32_t idx, uint32_t keyIdx) const{ DTSC::Keys keys(tracks.at(idx).keys); return keys.getTime(keyIdx); } /// Returns indice of the key containing timestamp, or last key if nowhere. uint32_t Meta::getKeyIndexForTime(uint32_t idx, uint64_t timestamp) const{ DTSC::Keys keys(tracks.at(idx).keys); uint32_t firstKey = keys.getFirstValid(); uint32_t endKey = keys.getEndValid(); for (size_t i = firstKey; i < endKey; i++){ if (keys.getTime(i) + keys.getDuration(i) > timestamp){return i;} } return endKey; } /// Returns the tiestamp for the given fragment index in the given track index. uint64_t Meta::getTimeForFragmentIndex(uint32_t idx, uint32_t fragmentIdx) const{ DTSC::Fragments fragments(tracks.at(idx).fragments); DTSC::Keys keys(tracks.at(idx).keys); return keys.getTime(fragments.getFirstKey(fragmentIdx)); } /// Returns the part index for the given timestamp. /// Assumes the Packet is for the given track, and assumes the metadata and track data are not out /// of sync. Works by looking up the key for the Packet's timestamp, then walking through the /// parts until the time matches or exceeds the time of the Packet. Returns zero if the track /// index is invalid or if the timestamp cannot be found. uint32_t Meta::getPartIndex(uint64_t timestamp, size_t idx) const{ if (idx == INVALID_TRACK_ID){return 0;} uint32_t res = 0; uint32_t keyIdx = getKeyIndexForTime(idx, timestamp); DTSC::Keys Keys(keys(idx)); DTSC::Parts Parts(parts(idx)); uint64_t currentTime = Keys.getTime(keyIdx); res = Keys.getFirstPart(keyIdx); size_t endPart = Parts.getEndValid(); for (size_t i = res; i < endPart; i++){ if (currentTime >= timestamp){return res;} currentTime += Parts.getDuration(i); res++; } return res; } /// Returns the part timestamp for the given index. /// Assumes the Packet is for the given track, and assumes the metadata and track data are not out /// of sync. Works by looking up the packet's key's timestamp, then walking through the /// parts adding up durations until we reach the part we want. Returns zero if the track /// index is invalid or if the timestamp cannot be found. uint64_t Meta::getPartTime(uint32_t partIndex, size_t idx) const{ if (idx == INVALID_TRACK_ID){return 0;} DTSC::Keys Keys(keys(idx)); DTSC::Parts Parts(parts(idx)); size_t kId = 0; for (kId = 0; kId < Keys.getEndValid(); ++kId){ size_t keyPartId = Keys.getFirstPart(kId); if (keyPartId+Keys.getParts(kId) > partIndex){ //It's inside this key. Step through. uint64_t res = Keys.getTime(kId); while (keyPartId < partIndex){ res += Parts.getDuration(keyPartId); ++keyPartId; } return res; } } return 0; } /// Given the current page, check if the next page is available. Returns true if it is. bool Meta::nextPageAvailable(uint32_t idx, size_t currentPage) const{ const Util::RelAccX &pages = tracks.at(idx).pages; for (size_t i = pages.getStartPos(); i + 1 < pages.getEndPos(); ++i){ if (pages.getInt("firstkey", i) == currentPage){return pages.getInt("avail", i + 1);} } return false; } /// Given a timestamp, returns the page number that timestamp can be found on. /// If the timestamp is not available, returns the closest page number that is. size_t Meta::getPageNumberForTime(uint32_t idx, uint64_t time) const{ const Util::RelAccX &pages = tracks.at(idx).pages; Util::RelAccXFieldData avail = pages.getFieldData("avail"); Util::RelAccXFieldData firsttime = pages.getFieldData("firsttime"); uint32_t res = pages.getStartPos(); uint64_t endPos = pages.getEndPos(); for (uint64_t i = res; i < endPos; ++i){ if (pages.getInt(avail, i) == 0){continue;} if (pages.getInt(firsttime, i) > time){break;} res = i; } DONTEVEN_MSG("Page number for time %" PRIu64 " on track %" PRIu32 " can be found on page %" PRIu64, time, idx, pages.getInt("firstkey", res)); return pages.getInt("firstkey", res); } /// Given a key, returns the page number it can be found on. /// If the key is not available, returns the closest page that is. size_t Meta::getPageNumberForKey(uint32_t idx, uint64_t keyNum) const{ const Util::RelAccX &pages = tracks.at(idx).pages; size_t res = pages.getStartPos(); for (size_t i = pages.getStartPos(); i < pages.getEndPos(); ++i){ if (pages.getInt("avail", i) == 0){continue;} if (pages.getInt("firstkey", i) > keyNum){break;} res = i; } return pages.getInt("firstkey", res); } /// Returns the key number containing a given time. /// Or, closest key if given time is not available. /// Or, INVALID_KEY_NUM if no keys are available at all. /// If the time is in the gap before a key, returns that next key instead. size_t Meta::getKeyNumForTime(uint32_t idx, uint64_t time) const{ const Track &trk = tracks.at(idx); const Util::RelAccX &keys = trk.keys; const Util::RelAccX &parts = trk.parts; if (!keys.getEndPos()){return INVALID_KEY_NUM;} size_t res = keys.getStartPos(); for (size_t i = res; i < keys.getEndPos(); i++){ if (keys.getInt(trk.keyTimeField, i) > time){ //It's possible we overshot our timestamp, but the previous key does not contain it. //This happens when seeking to a timestamp past the last part of the previous key, but //before the first part of the next key. //In this case, we should _not_ return the previous key, but the current key. //That prevents getting stuck at the end of the page, waiting for a part to show up that never will. if (keys.getInt(trk.keyFirstPartField, i) > parts.getStartPos()){ uint64_t dur = parts.getInt(trk.partDurationField, keys.getInt(trk.keyFirstPartField, i)-1); if (keys.getInt(trk.keyTimeField, i) - dur < time){res = i;} } continue; } res = i; } DONTEVEN_MSG("Key number for time %" PRIu64 " on track %" PRIu32 " is %zu", time, idx, res); return res; } /// By reference, returns a JSON object with health information on the stream void Meta::getHealthJSON(JSON::Value &retRef) const{ // clear the reference of old data, first retRef.null(); bool hasH264 = false; bool hasAAC = false; std::stringstream issues; std::set validTracks = getValidTracks(); uint64_t jitter = 0; uint64_t buffer = 0; for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ size_t i = *it; JSON::Value &track = retRef[getTrackIdentifier(i)]; uint64_t minKeep = getMinKeepAway(*it); track["jitter"] = minKeep; std::string codec = getCodec(i); std::string type = getType(i); track["kbits"] = getBps(i) * 8 / 1024; track["codec"] = codec; uint32_t shrtest_key = 0xFFFFFFFFul; uint32_t longest_key = 0; uint32_t shrtest_prt = 0xFFFFFFFFul; uint32_t longest_prt = 0; uint32_t shrtest_cnt = 0xFFFFFFFFul; uint32_t longest_cnt = 0; DTSC::Keys Mkeys(keys(i)); uint32_t firstKey = Mkeys.getFirstValid(); uint32_t endKey = Mkeys.getEndValid(); for (uint32_t k = firstKey; k+1 < endKey; k++){ uint64_t kDur = Mkeys.getDuration(k); uint64_t kParts = Mkeys.getParts(k); if (!kDur){continue;} if (kDur > longest_key){longest_key = kDur;} if (kDur < shrtest_key){shrtest_key = kDur;} if (kParts > longest_cnt){longest_cnt = kParts;} if (kParts < shrtest_cnt){shrtest_cnt = kParts;} if ((kDur / kParts) > longest_prt){longest_prt = (kDur / kParts);} if ((kDur / kParts) < shrtest_prt){shrtest_prt = (kDur / kParts);} } track["keys"]["ms_min"] = shrtest_key; track["keys"]["ms_max"] = longest_key; track["keys"]["frame_ms_min"] = shrtest_prt; track["keys"]["frame_ms_max"] = longest_prt; track["keys"]["frames_min"] = shrtest_cnt; track["keys"]["frames_max"] = longest_cnt; uint64_t trBuffer = getLastms(i) - getFirstms(i); track["buffer"] = trBuffer; size_t srcTrk = getSourceTrack(i); if (srcTrk != INVALID_TRACK_ID){ if (trackValid(srcTrk)){ track["source"] = getTrackIdentifier(srcTrk); }else{ track["source"] = "Invalid track " + JSON::Value((uint64_t)srcTrk).asString(); } }else{ if (jitter < minKeep){jitter = minKeep;} if (longest_prt > 500){ issues << "unstable connection (" << longest_prt << "ms " << codec << " frame)! "; } if (shrtest_cnt < 6){ issues << "unstable connection (" << shrtest_cnt << " " << codec << " frame(s) in key)! "; } if (longest_key > shrtest_key*1.30){ issues << "unstable key interval (" << (uint32_t)(((longest_key/shrtest_key)-1)*100) << "% " << codec << " variance)! "; } } if (buffer < trBuffer){buffer = trBuffer;} if (codec == "AAC"){hasAAC = true;} if (codec == "H264"){hasH264 = true;} if (type == "video"){ track["width"] = getWidth(i); track["height"] = getHeight(i); track["fpks"] = getFpks(i); track["bframes"] = hasBFrames(i); } if (type == "audio"){ track["rate"] = getRate(i); track["channels"] = getChannels(i); } } if (jitter > 500){ issues << "High jitter (" << jitter << "ms)! "; } retRef["jitter"] = jitter; retRef["buffer"] = buffer; if (getMaxKeepAway()){ retRef["maxkeepaway"] = getMaxKeepAway(); } if ((hasAAC || hasH264) && validTracks.size() > 1){ if (!hasAAC){issues << "HLS no audio!";} if (!hasH264){issues << "HLS no video!";} } if (issues.str().size()){retRef["issues"] = issues.str();} // return is by reference } /// Returns true if the tracks idx1 and idx2 are keyframe aligned bool Meta::keyTimingsMatch(size_t idx1, size_t idx2) const { const DTSC::Track &t1 = tracks.at(idx1); const DTSC::Track &t2 = tracks.at(idx2); uint64_t t1Firstms = t1.track.getInt(t1.trackFirstmsField); uint64_t t2Firstms = t2.track.getInt(t2.trackFirstmsField); uint64_t firstms = t1Firstms > t2Firstms ? t1Firstms : t2Firstms; uint64_t t1Lastms = t1.track.getInt(t1.trackFirstmsField); uint64_t t2Lastms = t2.track.getInt(t2.trackFirstmsField); uint64_t lastms = t1Lastms > t2Lastms ? t1Lastms : t2Lastms; if (firstms > lastms) { WARN_MSG("Cannot check for timing alignment for tracks %zu and %zu: No overlap", idx1, idx2); return false; } uint32_t keyIdx1 = getKeyIndexForTime(idx1,firstms); uint32_t keyIdx2 = getKeyIndexForTime(idx2,firstms); DTSC::Keys keys1(tracks.at(idx1).keys); DTSC::Keys keys2(tracks.at(idx2).keys); while(true) { if (lastms < keys1.getTime(keyIdx1) || lastms < keys2.getTime(keyIdx2)) {return true;} if (keys1.getTime(keyIdx1) != keys2.getTime(keyIdx2)) {return false;} keyIdx1++; keyIdx2++; } return true; } Parts::Parts(const Util::RelAccX &_parts) : parts(_parts){ sizeField = parts.getFieldData("size"); durationField = parts.getFieldData("duration"); offsetField = parts.getFieldData("offset"); } size_t Parts::getFirstValid() const{return parts.getDeleted();} size_t Parts::getEndValid() const{return parts.getEndPos();} size_t Parts::getValidCount() const{return getEndValid() - getFirstValid();} size_t Parts::getSize(size_t idx) const{return parts.getInt(sizeField, idx);} uint64_t Parts::getDuration(size_t idx) const{return parts.getInt(durationField, idx);} int64_t Parts::getOffset(size_t idx) const{return parts.getInt(offsetField, idx);} Keys::Keys(Util::RelAccX &_keys) : isConst(false), keys(_keys), cKeys(_keys){ firstPartField = cKeys.getFieldData("firstpart"); bposField = cKeys.getFieldData("bpos"); durationField = cKeys.getFieldData("duration"); numberField = cKeys.getFieldData("number"); partsField = cKeys.getFieldData("parts"); timeField = cKeys.getFieldData("time"); sizeField = cKeys.getFieldData("size"); } Keys::Keys(const Util::RelAccX &_keys) : isConst(true), keys(empty), cKeys(_keys){ firstPartField = cKeys.getFieldData("firstpart"); bposField = cKeys.getFieldData("bpos"); durationField = cKeys.getFieldData("duration"); numberField = cKeys.getFieldData("number"); partsField = cKeys.getFieldData("parts"); timeField = cKeys.getFieldData("time"); sizeField = cKeys.getFieldData("size"); } size_t Keys::getFirstValid() const{return cKeys.getDeleted();} size_t Keys::getEndValid() const{return cKeys.getEndPos();} size_t Keys::getValidCount() const{return getEndValid() - getFirstValid();} size_t Keys::getFirstPart(size_t idx) const{return cKeys.getInt(firstPartField, idx);} size_t Keys::getBpos(size_t idx) const{return cKeys.getInt(bposField, idx);} uint64_t Keys::getDuration(size_t idx) const{return cKeys.getInt(durationField, idx);} size_t Keys::getNumber(size_t idx) const{return cKeys.getInt(numberField, idx);} size_t Keys::getParts(size_t idx) const{return cKeys.getInt(partsField, idx);} uint64_t Keys::getTime(size_t idx) const{return cKeys.getInt(timeField, idx);} void Keys::setSize(size_t idx, size_t _size){ if (isConst){return;} keys.setInt(sizeField, _size, idx); } size_t Keys::getSize(size_t idx) const{return cKeys.getInt(sizeField, idx);} Fragments::Fragments(const Util::RelAccX &_fragments) : fragments(_fragments){} size_t Fragments::getFirstValid() const{return fragments.getDeleted();} size_t Fragments::getEndValid() const{return fragments.getEndPos();} size_t Fragments::getValidCount() const{return getEndValid() - getFirstValid();} uint64_t Fragments::getDuration(size_t idx) const{return fragments.getInt("duration", idx);} size_t Fragments::getKeycount(size_t idx) const{return fragments.getInt("keys", idx);} size_t Fragments::getFirstKey(size_t idx) const{return fragments.getInt("firstkey", idx);} size_t Fragments::getSize(size_t idx) const{return fragments.getInt("size", idx);} }// namespace DTSC