From 4df771eb028997526a48f15880fb7cdf71e8fc4e Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 27 Mar 2023 23:16:51 +0200 Subject: [PATCH] Optimization that allows Socket::Buffer to write directly to a Util::ResizeablePointer --- lib/dtsc.cpp | 17 ++++++++++------- lib/socket.cpp | 19 +++++++++++++++++++ lib/socket.h | 3 +++ lib/urireader.cpp | 5 +++-- lib/websocket.cpp | 18 ++++++++---------- src/analysers/analyser_dtsc.cpp | 10 +++++++--- src/input/input_ts.cpp | 8 +++++--- 7 files changed, 55 insertions(+), 25 deletions(-) diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 826128d7..250f7b46 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -130,20 +130,23 @@ namespace DTSC{ void Packet::reInit(Socket::Connection &src){ int sleepCount = 0; null(); - int toReceive = 0; + Util::ResizeablePointer ptr; while (src.connected()){ - if (!toReceive && src.Received().available(8)){ + if (!ptr.rsize() && src.Received().available(8)){ if (src.Received().copy(2) != "DT"){ WARN_MSG("Invalid DTSC Packet header encountered (%s)", Encodings::Hex::encode(src.Received().copy(4)).c_str()); break; } - toReceive = Bit::btohl(src.Received().copy(8).data() + 4); + ptr.allocate(Bit::btohl(src.Received().copy(8).data() + 4) + 8); } - if (toReceive && src.Received().available(toReceive + 8)){ - std::string dataBuf = src.Received().remove(toReceive + 8); - reInit(dataBuf.data(), dataBuf.size()); - return; + unsigned int readable = src.Received().bytes(ptr.rsize() - ptr.size()); + if (ptr.rsize() && readable){ + src.Received().remove(ptr, readable); + if (ptr.size() == ptr.rsize()){ + reInit(ptr, ptr.size()); + return; + } } if (!src.spool()){ if (sleepCount++ > 750){ diff --git a/lib/socket.cpp b/lib/socket.cpp index f53ce93f..abf99a1b 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -473,6 +473,25 @@ std::string Socket::Buffer::remove(unsigned int count){ return ret; } +/// Removes count bytes from the buffer, appending them to the given ptr. +/// Does nothing if not all count bytes are available. +void Socket::Buffer::remove(Util::ResizeablePointer & ptr, unsigned int count){ + size(); + if (!available(count)){return;} + unsigned int i = 0; + for (std::deque::reverse_iterator it = data.rbegin(); it != data.rend(); ++it){ + if (i + (*it).size() < count){ + ptr.append(*it); + i += (*it).size(); + (*it).clear(); + }else{ + ptr.append(it->data(), count - i); + (*it).erase(0, count - i); + break; + } + } +} + /// Copies count bytes from the buffer, returning them by value. /// Returns an empty string if not all count bytes are available. std::string Socket::Buffer::copy(unsigned int count){ diff --git a/lib/socket.h b/lib/socket.h index b369ac84..e6366fcd 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -26,6 +26,8 @@ #include "mbedtls/ssl.h" #endif +#include "util.h" + // for being friendly with Socket::Connection down below namespace Buffer{ class user; @@ -68,6 +70,7 @@ namespace Socket{ bool available(unsigned int count); bool available(unsigned int count) const; std::string remove(unsigned int count); + void remove(Util::ResizeablePointer & ptr, unsigned int count); std::string copy(unsigned int count); void clear(); }; diff --git a/lib/urireader.cpp b/lib/urireader.cpp index 72496802..d096777a 100644 --- a/lib/urireader.cpp +++ b/lib/urireader.cpp @@ -338,8 +338,9 @@ namespace HTTP{ return; } } - std::string buf = downer.getSocket().Received().remove(s); - cb.dataCallback(buf.data(), s); + Util::ResizeablePointer buf; + downer.getSocket().Received().remove(buf, s); + cb.dataCallback(buf, s); } } diff --git a/lib/websocket.cpp b/lib/websocket.cpp index c82b4cba..0fdf3e63 100644 --- a/lib/websocket.cpp +++ b/lib/websocket.cpp @@ -156,19 +156,17 @@ namespace HTTP{ return false; } C.Received().remove(headSize); // delete the header - std::string pl = C.Received().remove(payLen); - if (masked){ - // If masked, apply the mask to the payload - const char *mask = head.data() + headSize - 4; // mask is last 4 bytes of header - for (uint32_t i = 0; i < payLen; ++i){pl[i] ^= mask[i % 4];} - } if ((head[0] & 0xF)){ // Non-continuation frameType = (head[0] & 0xF); - data.assign(pl.data(), pl.size()); - }else{ - // Continuation - data.append(pl.data(), pl.size()); + data.truncate(0); + } + size_t preSize = data.size(); + C.Received().remove(data, payLen); + if (masked){ + // If masked, apply the mask to the payload + const char *mask = head.data() + headSize - 4; // mask is last 4 bytes of header + for (uint32_t i = 0; i < payLen; ++i){data[i+preSize] ^= mask[i % 4];} } if (head[0] & 0x80){ // FIN diff --git a/src/analysers/analyser_dtsc.cpp b/src/analysers/analyser_dtsc.cpp index bb10997b..95f2c5a9 100644 --- a/src/analysers/analyser_dtsc.cpp +++ b/src/analysers/analyser_dtsc.cpp @@ -30,11 +30,15 @@ AnalyserDTSC::AnalyserDTSC(Util::Config &conf) : Analyser(conf){ bool AnalyserDTSC::parsePacket(){ if (headLess){ + Util::ResizeablePointer dataBuf; while (conn){ - if (!conn.spool()){Util::sleep(50);} + if (conn.spool()){ + conn.Received().remove(dataBuf, conn.Received().bytes(0xFFFFFFFFul)); + }else{ + Util::sleep(50); + } } - std::string dataBuf = conn.Received().remove(conn.Received().bytes(0xFFFFFFFFul)); - DTSC::Scan S((char *)dataBuf.data(), dataBuf.size()); + DTSC::Scan S(dataBuf, dataBuf.size()); std::cout << S.toPrettyString() << std::endl; return false; } diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index f9b28d1c..18b8572d 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -534,6 +534,7 @@ namespace Mist{ bool hasStarted = false; cfgPointer = config; globalStreamName = streamName; + Util::ResizeablePointer newData; unsigned long long threadCheckTimer = Util::bootSecs(); while (config->is_active){ if (tcpCon){ @@ -543,10 +544,11 @@ namespace Mist{ tcpCon.Received().remove(1); } if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){ - std::string newData = tcpCon.Received().remove(188); + newData.truncate(0); + tcpCon.Received().remove(newData, 188); if (rawMode){ keepAlive(); - rawBuffer.append(newData); + rawBuffer.append(newData, newData.size()); if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ if (rawIdx == INVALID_TRACK_ID){ rawIdx = meta.addTrack(); @@ -562,7 +564,7 @@ namespace Mist{ rawBuffer.truncate(0); } }else { - tsBuf.FromPointer(newData.data()); + tsBuf.FromPointer(newData); liveStream.add(tsBuf); if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} }