From 213318bfa35a24c3873b53d9d2752eb414fcc5ad Mon Sep 17 00:00:00 2001 From: dijunkun Date: Tue, 5 Sep 2023 17:38:10 +0800 Subject: [PATCH] Implementation for rtp module --- src/ringbuffer/ringbuffer.h | 90 ++++++++--------- src/rtp/rtp_packet.cpp | 134 ++++++++++++++++++++++++-- src/rtp/rtp_packet.h | 63 +++++++----- src/transmission/ice_transmission.cpp | 32 ++++-- src/transmission/ice_transmission.h | 5 +- xmake.lua | 8 +- 6 files changed, 242 insertions(+), 90 deletions(-) diff --git a/src/ringbuffer/ringbuffer.h b/src/ringbuffer/ringbuffer.h index bf73ef1..312328c 100644 --- a/src/ringbuffer/ringbuffer.h +++ b/src/ringbuffer/ringbuffer.h @@ -3,50 +3,50 @@ #include +class Data { + public: + Data() = default; + + Data(const char* data, size_t size) { + data_ = new char[size]; + memcpy(data_, data, size); + size_ = size; + } + + Data(const Data& obj) { + data_ = new char[obj.size_]; + memcpy(data_, obj.data_, obj.size_); + size_ = obj.size_; + } + + Data& operator=(const Data& obj) { + data_ = new char[obj.size_]; + memcpy(data_, obj.data_, obj.size_); + size_ = obj.size_; + return *this; + } + + ~Data() { + if (data_) { + delete data_; + data_ = nullptr; + } + size_ = 0; + } + + size_t size() const { return size_; } + char* data() const { return data_; } + + public: + char* data_ = nullptr; + size_t size_ = 0; +}; + +template class RingBuffer { - public: - class Data { - public: - Data() = default; - - Data(const char* data, size_t size) { - data_ = new char[size]; - memcpy(data_, data, size); - size_ = size; - } - - Data(const Data& obj) { - data_ = new char[obj.size_]; - memcpy(data_, obj.data_, obj.size_); - size_ = obj.size_; - } - - Data& operator=(const Data& obj) { - data_ = new char[obj.size_]; - memcpy(data_, obj.data_, obj.size_); - size_ = obj.size_; - return *this; - } - - ~Data() { - if (data_) { - delete data_; - data_ = nullptr; - } - size_ = 0; - } - - size_t size() const { return size_; } - char* data() const { return data_; } - - public: - char* data_ = nullptr; - size_t size_ = 0; - }; - public: RingBuffer(unsigned size = 128) : m_size(size), m_front(0), m_rear(0) { - m_data = new Data[size]; + m_data = new T[size]; } ~RingBuffer() { @@ -58,7 +58,7 @@ class RingBuffer { inline bool isFull() const { return m_front == (m_rear + 1) % m_size; } - bool push(const Data& value) { + bool push(const T& value) { if (isFull()) { return false; } @@ -67,7 +67,7 @@ class RingBuffer { return true; } - bool push(const Data* value) { + bool push(const T* value) { if (isFull()) { return false; } @@ -76,7 +76,7 @@ class RingBuffer { return true; } - inline bool pop(Data& value) { + inline bool pop(T& value) { if (isEmpty()) { return false; } @@ -95,7 +95,7 @@ class RingBuffer { unsigned int m_size; int m_front; int m_rear; - Data* m_data; + T* m_data; }; #endif \ No newline at end of file diff --git a/src/rtp/rtp_packet.cpp b/src/rtp/rtp_packet.cpp index 8e93bf5..17c52f7 100644 --- a/src/rtp/rtp_packet.cpp +++ b/src/rtp/rtp_packet.cpp @@ -1,6 +1,7 @@ #include "rtp_packet.h" #include + // 0 1 2 3 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -13,7 +14,7 @@ // | Contributing source (CSRC) identifiers | // | .... | // +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -// | header eXtension profile id | length in 32bits | +// | defined by profile | length | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // | Extensions | // | .... | @@ -24,26 +25,141 @@ // | padding | Padding size | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +constexpr size_t kDefaultMtu = 1400; + +RtpPacket::RtpPacket() : buffer_(new uint8_t[kDefaultMtu]), size_(kDefaultMtu) { + memset(buffer_, 0, kDefaultMtu); +} + RtpPacket::RtpPacket(const uint8_t *buffer, size_t size) { if (size > 0) { - payload_ = new uint8_t[size]; - memcpy(payload_, buffer, size); - payload_size_ = size; + buffer_ = new uint8_t[size]; + memcpy(buffer_, buffer, size); + size_ = size; } } -RtpPacket::RtpPacket(const RtpPacket &rhs) { - if (rhs.payload_size_ > 0) { - payload_ = new uint8_t[rhs.payload_size_]; - memcpy(payload_, rhs.payload_, rhs.payload_size_); - payload_size_ = rhs.payload_size_; +RtpPacket::RtpPacket(const RtpPacket &rtp_packet) { + if (rtp_packet.size_ > 0) { + buffer_ = new uint8_t[rtp_packet.size_]; + memcpy(buffer_, rtp_packet.buffer_, rtp_packet.size_); + size_ = rtp_packet.size_; } } +RtpPacket &RtpPacket::operator=(const RtpPacket &rtp_packet) { + if (&rtp_packet != this && rtp_packet.size_ > 0) { + buffer_ = new uint8_t[rtp_packet.size_]; + memcpy(buffer_, rtp_packet.buffer_, rtp_packet.size_); + size_ = rtp_packet.size_; + } + return *this; +} + +RtpPacket::RtpPacket(RtpPacket &&rtp_packet) + : buffer_((uint8_t *)rtp_packet.Buffer()), size_(rtp_packet.Size()) {} + RtpPacket::~RtpPacket() { + if (buffer_) { + delete buffer_; + buffer_ = nullptr; + } + size_ = 0; + + if (extension_data_) { + delete extension_data_; + extension_data_ = nullptr; + } + extension_len_ = 0; + if (payload_) { delete payload_; payload_ = nullptr; } payload_size_ = 0; +} + +const uint8_t *RtpPacket::Encode(uint8_t *payload, size_t payload_size) { + buffer_[0] = (version_ << 6) | (has_padding_ << 5) | (has_extension_ << 4) | + total_csrc_number_; + buffer_[1] = (marker_ << 7) | payload_type_; + buffer_[2] = (sequence_number_ >> 8) & 0xFF; + buffer_[3] = sequence_number_ & 0xFF; + buffer_[4] = (timestamp_ >> 24) & 0xFF; + buffer_[5] = (timestamp_ >> 16) & 0xFF; + buffer_[6] = (timestamp_ >> 8) & 0xFF; + buffer_[7] = timestamp_ & 0xFF; + buffer_[8] = (ssrc_ >> 24) & 0xFF; + buffer_[9] = (ssrc_ >> 16) & 0xFF; + buffer_[10] = (ssrc_ >> 8) & 0xFF; + buffer_[11] = ssrc_ & 0xFF; + + for (uint32_t index = 0; index < total_csrc_number_ && !csrcs_.empty(); + index++) { + buffer_[12 + index] = (csrcs_[index] >> 24) & 0xFF; + buffer_[13 + index] = (csrcs_[index] >> 16) & 0xFF; + buffer_[14 + index] = (csrcs_[index] >> 8) & 0xFF; + buffer_[15 + index] = csrcs_[index] & 0xFF; + } + + uint32_t extension_offset = + total_csrc_number_ && !csrcs_.empty() ? total_csrc_number_ * 4 : 0; + if (has_extension_ && extension_data_) { + buffer_[12 + extension_offset] = extension_profile_ >> 8; + buffer_[13 + extension_offset] = extension_profile_ & 0xff; + buffer_[14 + extension_offset] = (extension_len_ >> 8) & 0xFF; + buffer_[15 + extension_offset] = extension_len_ & 0xFF; + memcpy(buffer_ + 16 + extension_offset, extension_data_, extension_len_); + } + + uint32_t payload_offset = + (has_extension_ && extension_data_ ? extension_len_ : 0) + + extension_offset; + + memcpy(buffer_ + 12 + payload_offset, payload, payload_size); + size_ = payload_size + (12 + payload_offset); + + return buffer_; +} + +const uint8_t *RtpPacket::Decode() { + version_ = (buffer_[0] >> 6) & 0x03; + has_padding_ = (buffer_[0] >> 5) & 0x01; + has_extension_ = (buffer_[0] >> 4) & 0x01; + total_csrc_number_ = buffer_[0] & 0x0f; + marker_ = (buffer_[1] >> 7) & 0x01; + payload_type_ = buffer_[1] & 0x7f; + sequence_number_ = (buffer_[2] << 8) | buffer_[3]; + timestamp_ = + (buffer_[4] << 24) | (buffer_[5] << 16) | (buffer_[6] << 8) | buffer_[7]; + ssrc_ = (buffer_[8] << 24) | (buffer_[9] << 16) | (buffer_[10] << 8) | + buffer_[11]; + + for (uint32_t index = 0; index < total_csrc_number_; index++) { + uint32_t csrc = (buffer_[12 + index] << 24) | (buffer_[13 + index] << 16) | + (buffer_[14 + index] << 8) | buffer_[15 + index]; + csrcs_.push_back(csrc); + } + + uint32_t extension_offset = total_csrc_number_ * 4; + if (has_extension_) { + extension_profile_ = + (buffer_[12 + extension_offset] << 8) | buffer_[13 + extension_offset]; + extension_len_ = + (buffer_[14 + extension_offset] << 8) | buffer_[15 + extension_offset]; + + // extension_data_ = new uint8_t[extension_len_]; + // memcpy(extension_data_, buffer_ + 16 + extension_offset, extension_len_); + extension_data_ = buffer_ + 16 + extension_offset; + } + + uint32_t payload_offset = + (has_extension_ ? extension_len_ : 0) + extension_offset; + + payload_size_ = size_ - (12 + payload_offset); + // payload_ = new uint8_t[payload_size_]; + // memcpy(payload_, buffer_ + 12 + payload_offset, payload_size_); + payload_ = buffer_ + 12 + payload_offset; + + return payload_; } \ No newline at end of file diff --git a/src/rtp/rtp_packet.h b/src/rtp/rtp_packet.h index f78936f..f2c6528 100644 --- a/src/rtp/rtp_packet.h +++ b/src/rtp/rtp_packet.h @@ -16,7 +16,7 @@ // | Contributing source (CSRC) identifiers | // | .... | // +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -// | header eXtension profile id | length in 32bits | +// | defined by profile | length | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // | Extensions | // | .... | @@ -29,15 +29,17 @@ class RtpPacket { public: + RtpPacket(); RtpPacket(const uint8_t *buffer, size_t size); - RtpPacket(const RtpPacket &rhs); - RtpPacket() {} + RtpPacket(const RtpPacket &rtp_packet); + RtpPacket &operator=(const RtpPacket &rtp_packet); + RtpPacket(RtpPacket &&rtp_packet); ~RtpPacket(); public: - // Header - void SetMarker(bool marker) { marker_ = marker; } + // Set Header + void SetMarker(bool has_marker) { marker_ = has_marker; } void SetPayloadType(uint8_t payload_type) { payload_type_ = payload_type; } void SetSequenceNumber(uint16_t sequence_number) { sequence_number_ = sequence_number; @@ -45,56 +47,65 @@ class RtpPacket { void SetTimestamp(uint32_t timestamp) { timestamp_ = timestamp; } void SetSsrc(uint32_t ssrc) { ssrc_ = ssrc; } void SetCsrcs(){}; - void SetHeadersSize(size_t payload_offset) { - payload_offset_ = payload_offset; - } + void SetHasExtension(bool has_extension) { has_extension_ = has_extension; }; - void SetExtensionID(uint16_t extension_id) { extension_id_ = extension_id; } - void SetExtensionData(uint8_t *extension_data) { - extension_data_ = extension_data; + void SetExtensionProfile(uint16_t extension_profile) { + extension_profile_ = extension_profile; + } + void SetExtensionData(uint8_t *extension_data, size_t extension_len) { + extension_len_ = extension_len; + extension_data_ = new uint8_t[extension_len_]; + memcpy(extension_data_, extension_data, extension_len_); } - // Payload - void SetPayload(uint8_t *payload) { - payload_ = new uint8_t[payload_size_]; - memcpy(payload_, payload, payload_size_); - }; - void SetPayloadSize(size_t payload_size) { payload_size_ = payload_size; }; - void SetPaddingSize(size_t padding_size) { padding_size_ = padding_size; } + public: + const uint8_t *Encode(uint8_t *payload, size_t payload_size); + const uint8_t *Decode(); - // Header + public: + // Get Header bool Marker() const { return marker_; } uint8_t PayloadType() const { return payload_type_; } uint16_t SequenceNumber() const { return sequence_number_; } uint32_t Timestamp() const { return timestamp_; } uint32_t Ssrc() const { return ssrc_; } std::vector Csrcs() const; - size_t HeadersSize() const { return payload_offset_; } bool HasExtension() const { return has_extension_; }; - uint16_t ExtensionID() const { return extension_id_; } + uint16_t ExtensionProfile() const { return extension_profile_; } uint8_t *ExtensionData() const { return extension_data_; } // Payload uint8_t *Payload() const { return payload_; }; size_t PayloadSize() const { return payload_size_; } - size_t PaddingSize() const { return padding_size_; } + + public: + const uint8_t *Buffer() { return buffer_; } + const size_t Size() { return size_; } private: // Header + uint32_t version_ = 0; + bool has_padding_ = false; + bool has_extension_ = false; + uint32_t total_csrc_number_ = 0; bool marker_ = false; uint8_t payload_type_ = 0; uint16_t sequence_number_ = 0; uint32_t timestamp_ = 0; uint32_t ssrc_ = 0; - size_t payload_offset_ = 0; - bool has_extension_ = false; - uint16_t extension_id_ = 0; + std::vector csrcs_; + uint16_t profile_ = 0; + uint16_t extension_profile_ = 0; + uint16_t extension_len_ = 0; uint8_t *extension_data_ = nullptr; // Payload uint8_t *payload_ = nullptr; size_t payload_size_ = 0; - uint8_t padding_size_; + + // Entire RTP buffer + uint8_t *buffer_ = nullptr; + size_t size_ = 0; }; #endif \ No newline at end of file diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index 8d4de2c..8469df4 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -66,17 +66,21 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { ikcp_update(kcp, clock); if (!send_ringbuffer_.isEmpty()) { - RingBuffer::Data buffer; + // Data buffer; + RtpPacket buffer; if (ikcp_waitsnd(kcp) <= kcp->snd_wnd * 2) { send_ringbuffer_.pop(buffer); - ret = ikcp_send(kcp, buffer.data(), buffer.size()); + // ret = ikcp_send(kcp, buffer.data(), buffer.size()); + ret = ikcp_send(kcp, (const char *)buffer.Buffer(), buffer.Size()); } } if (!recv_ringbuffer_.isEmpty()) { - RingBuffer::Data buffer; + // Data buffer; + RtpPacket buffer; recv_ringbuffer_.pop(buffer); - ret = ikcp_input(kcp, buffer.data(), buffer.size()); + // ret = ikcp_input(kcp, buffer.data(), buffer.size()); + ret = ikcp_input(kcp, (const char *)buffer.Buffer(), buffer.Size()); } int len = 0; @@ -145,8 +149,11 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { IceTransmission *ice_transmission_obj = static_cast(user_ptr); if (ice_transmission_obj->on_receive_ice_msg_cb_) { - ice_transmission_obj->recv_ringbuffer_.push( - std::move(RingBuffer::Data(data, size))); + // ice_transmission_obj->recv_ringbuffer_.push( + // std::move(Data(data, size))); + + RtpPacket packet((uint8_t *)data, size); + ice_transmission_obj->recv_ringbuffer_.push(packet); // int ret = ikcp_input(ice_transmission_obj->kcp_, data, size); // ikcp_update(ice_transmission_obj->kcp_, iclock()); @@ -258,7 +265,18 @@ int IceTransmission::SendAnswer() { int IceTransmission::SendData(const char *data, size_t size) { if (JUICE_STATE_COMPLETED == state_) { - send_ringbuffer_.push(std::move(RingBuffer::Data(data, size))); + // send_ringbuffer_.push(std::move(Data(data, size))); + + int num = 0; + for (num = 0; num * 1400 < size; num++) { + RtpPacket packet((uint8_t *)(data + num * 1400), 1400); + send_ringbuffer_.push(packet); + } + + if (size - num * 1400 > 0) { + RtpPacket packet((uint8_t *)(data + num * 1400), size - num * 1400); + send_ringbuffer_.push(packet); + } } return 0; } \ No newline at end of file diff --git a/src/transmission/ice_transmission.h b/src/transmission/ice_transmission.h index 1999bd3..6fb884c 100644 --- a/src/transmission/ice_transmission.h +++ b/src/transmission/ice_transmission.h @@ -6,6 +6,7 @@ #include "congestion_control.h" #include "ice_agent.h" #include "ringbuffer.h" +#include "rtp_packet.h" #include "ws_transmission.h" class IceTransmission { public: @@ -67,8 +68,8 @@ class IceTransmission { // ikcpcb *kcp_ = nullptr; char kcp_complete_buffer_[2560 * 1440 * 4]; std::mutex mtx_; - RingBuffer send_ringbuffer_; - RingBuffer recv_ringbuffer_; + RingBuffer send_ringbuffer_; + RingBuffer recv_ringbuffer_; bool kcp_stop_ = false; std::thread *kcp_update_thread_ = nullptr; }; diff --git a/xmake.lua b/xmake.lua index 8a21abf..e640682 100644 --- a/xmake.lua +++ b/xmake.lua @@ -46,6 +46,12 @@ target("inih") target("ringbuffer") set_kind("headeronly") add_includedirs("src/ringbuffer", {public = true}) + +target("rtp") + set_kind("static") + add_deps("log") + add_files("src/rtp/*.cpp") + add_includedirs("src/rtp", {public = true}) target("ice") set_kind("static") @@ -83,7 +89,7 @@ target("qos") target("transmission") set_kind("static") - add_deps("log", "ws", "ice", "qos", "ringbuffer") + add_deps("log", "ws", "ice", "qos", "ringbuffer", "rtp") add_files("src/transmission/*.cpp") add_packages("asio", "nlohmann_json") add_includedirs("src/ws", "src/ice", "src/qos", {public = true})