diff --git a/src/channel/rtp_channel/rtp_video_receiver.cpp b/src/channel/rtp_channel/rtp_video_receiver.cpp index 464ff5f..0da9ec4 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.cpp +++ b/src/channel/rtp_channel/rtp_video_receiver.cpp @@ -1,18 +1,22 @@ #include "rtp_video_receiver.h" +#include "common.h" #include "log.h" +#include "rtcp_sender.h" #define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2) #define RTCP_RR_INTERVAL 1000 RtpVideoReceiver::RtpVideoReceiver() - : receive_side_congestion_controller_( + : feedback_ssrc_(GenerateUniqueSsrc()), + receive_side_congestion_controller_( [this](std::vector> packets) { SendCombinedRtcpPacket(std::move(packets)); }) {} RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr io_statistics) : io_statistics_(io_statistics), + feedback_ssrc_(GenerateUniqueSsrc()), receive_side_congestion_controller_( [this](std::vector> packets) { SendCombinedRtcpPacket(std::move(packets)); @@ -31,6 +35,8 @@ RtpVideoReceiver::~RtpVideoReceiver() { if (rtcp_thread_.joinable()) { rtcp_thread_.join(); } + + SSRCManager::Instance().DeleteSsrc(feedback_ssrc_); } void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { @@ -347,17 +353,23 @@ int RtpVideoReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) { } void RtpVideoReceiver::SendCombinedRtcpPacket( - std::vector> packets) { + std::vector> rtcp_packets) { if (!data_send_func_) { LOG_ERROR("data_send_func_ is nullptr"); } - LOG_ERROR("Send combined rtcp packet"); + // LOG_ERROR("Send combined rtcp packet"); - for (auto& packet : packets) { - if (data_send_func_((const char*)packet->Buffer(), packet->Size())) { - LOG_ERROR("Send CCB failed"); - } + RTCPSender rtcp_sender( + [this](const uint8_t* buffer, size_t size) -> int { + return data_send_func_((const char*)buffer, size); + }, + IP_PACKET_SIZE); + + for (auto& rtcp_packet : rtcp_packets) { + rtcp_packet->SetSenderSsrc(feedback_ssrc_); + rtcp_sender.AppendPacket(*rtcp_packet); + rtcp_sender.Send(); } } diff --git a/src/channel/rtp_channel/rtp_video_receiver.h b/src/channel/rtp_channel/rtp_video_receiver.h index 9144ea3..883a6c1 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.h +++ b/src/channel/rtp_channel/rtp_video_receiver.h @@ -44,7 +44,8 @@ class RtpVideoReceiver : public ThreadBase { bool CheckIsTimeSendRR(); int SendRtcpRR(RtcpReceiverReport& rtcp_rr); - void SendCombinedRtcpPacket(std::vector> packets); + void SendCombinedRtcpPacket( + std::vector> rtcp_packets); private: bool Process() override; @@ -89,6 +90,7 @@ class RtpVideoReceiver : public ThreadBase { private: ReceiveSideCongestionController receive_side_congestion_controller_; + uint32_t feedback_ssrc_ = 0; }; #endif diff --git a/src/common/array_view.h b/src/common/array_view.h deleted file mode 100644 index a0febdc..0000000 --- a/src/common/array_view.h +++ /dev/null @@ -1,257 +0,0 @@ -/* - * @Author: DI JUNKUN - * @Date: 2024-12-18 - * Copyright 2018 The WebRTC project authors. All Rights Reserved. - */ - -#ifndef _ARRAY_VIEW_H_ -#define _ARRAY_VIEW_H_ - -#include -#include -#include -#include -#include - -#include "type_traits.h" - -namespace array_view_internal { - -// Magic constant for indicating that the size of an ArrayView is variable -// instead of fixed. -enum : std::ptrdiff_t { kArrayViewVarSize = -4711 }; - -// Base class for ArrayViews of fixed nonzero size. -template -class ArrayViewBase { - static_assert(Size > 0, "ArrayView size must be variable or non-negative"); - - public: - ArrayViewBase(T* data, size_t /* size */) : data_(data) {} - - static constexpr size_t size() { return Size; } - static constexpr bool empty() { return false; } - T* data() const { return data_; } - - protected: - static constexpr bool fixed_size() { return true; } - - private: - T* data_; -}; - -// Specialized base class for ArrayViews of fixed zero size. -template -class ArrayViewBase { - public: - explicit ArrayViewBase(T* /* data */, size_t /* size */) {} - - static constexpr size_t size() { return 0; } - static constexpr bool empty() { return true; } - T* data() const { return nullptr; } - - protected: - static constexpr bool fixed_size() { return true; } -}; - -// Specialized base class for ArrayViews of variable size. -template -class ArrayViewBase { - public: - ArrayViewBase(T* data, size_t size) - : data_(size == 0 ? nullptr : data), size_(size) {} - - size_t size() const { return size_; } - bool empty() const { return size_ == 0; } - T* data() const { return data_; } - - protected: - static constexpr bool fixed_size() { return false; } - - private: - T* data_; - size_t size_; -}; - -} // namespace array_view_internal - -template -class ArrayView final : public array_view_internal::ArrayViewBase { - public: - using value_type = T; - using reference = value_type&; - using const_reference = const value_type&; - using pointer = value_type*; - using const_pointer = const value_type*; - using const_iterator = const T*; - - // Construct an ArrayView from a pointer and a length. - template - ArrayView(U* data, size_t size) - : array_view_internal::ArrayViewBase::ArrayViewBase(data, size) { - // RTC_DCHECK_EQ(size == 0 ? nullptr : data, this->data()); - // RTC_DCHECK_EQ(size, this->size()); - // RTC_DCHECK_EQ(!this->data(), - // this->size() == 0); // data is null iff size == 0. - } - - // Construct an empty ArrayView. Note that fixed-size ArrayViews of size > 0 - // cannot be empty. - ArrayView() : ArrayView(nullptr, 0) {} - ArrayView(std::nullptr_t) // NOLINT - : ArrayView() {} - ArrayView(std::nullptr_t, size_t size) - : ArrayView(static_cast(nullptr), size) { - static_assert(Size == 0 || Size == array_view_internal::kArrayViewVarSize, - ""); - // RTC_DCHECK_EQ(0, size); - } - - // Construct an ArrayView from a C-style array. - template - ArrayView(U (&array)[N]) // NOLINT - : ArrayView(array, N) { - static_assert(Size == N || Size == array_view_internal::kArrayViewVarSize, - "Array size must match ArrayView size"); - } - - // (Only if size is fixed.) Construct a fixed size ArrayView from a - // non-const std::array instance. For an ArrayView with variable size, the - // used ctor is ArrayView(U& u) instead. - template (N)>::type* = nullptr> - ArrayView(std::array& u) // NOLINT - : ArrayView(u.data(), u.size()) {} - - // (Only if size is fixed.) Construct a fixed size ArrayView where T is - // const from a const(expr) std::array instance. For an ArrayView with - // variable size, the used ctor is ArrayView(U& u) instead. - template (N)>::type* = nullptr> - ArrayView(const std::array& u) // NOLINT - : ArrayView(u.data(), u.size()) {} - - // (Only if size is fixed.) Construct an ArrayView from any type U that has a - // static constexpr size() method whose return value is equal to Size, and a - // data() method whose return value converts implicitly to T*. In particular, - // this means we allow conversion from ArrayView to ArrayView, but not the other way around. We also don't allow conversion from - // ArrayView to ArrayView, or from ArrayView to ArrayView when M != N. - template ::value>::type* = nullptr> - ArrayView(U& u) // NOLINT - : ArrayView(u.data(), u.size()) { - static_assert(U::size() == Size, "Sizes must match exactly"); - } - template ::value>::type* = nullptr> - ArrayView(const U& u) // NOLINT(runtime/explicit) - : ArrayView(u.data(), u.size()) { - static_assert(U::size() == Size, "Sizes must match exactly"); - } - - // (Only if size is variable.) Construct an ArrayView from any type U that - // has a size() method whose return value converts implicitly to size_t, and - // a data() method whose return value converts implicitly to T*. In - // particular, this means we allow conversion from ArrayView to - // ArrayView, but not the other way around. Other allowed - // conversions include - // ArrayView to ArrayView or ArrayView, - // std::vector to ArrayView or ArrayView, - // const std::vector to ArrayView, - // rtc::Buffer to ArrayView or ArrayView, and - // const rtc::Buffer to ArrayView. - template ::value>::type* = nullptr> - ArrayView(U& u) // NOLINT - : ArrayView(u.data(), u.size()) {} - template ::value>::type* = nullptr> - ArrayView(const U& u) // NOLINT(runtime/explicit) - : ArrayView(u.data(), u.size()) {} - - // Indexing and iteration. These allow mutation even if the ArrayView is - // const, because the ArrayView doesn't own the array. (To prevent mutation, - // use a const element type.) - T& operator[](size_t idx) const { - // RTC_DCHECK_LT(idx, this->size()); - // RTC_DCHECK(this->data()); - return this->data()[idx]; - } - T* begin() const { return this->data(); } - T* end() const { return this->data() + this->size(); } - const T* cbegin() const { return this->data(); } - const T* cend() const { return this->data() + this->size(); } - std::reverse_iterator rbegin() const { - return std::make_reverse_iterator(end()); - } - std::reverse_iterator rend() const { - return std::make_reverse_iterator(begin()); - } - std::reverse_iterator crbegin() const { - return std::make_reverse_iterator(cend()); - } - std::reverse_iterator crend() const { - return std::make_reverse_iterator(cbegin()); - } - - ArrayView subview(size_t offset, size_t size) const { - return offset < this->size() - ? ArrayView(this->data() + offset, - std::min(size, this->size() - offset)) - : ArrayView(); - } - ArrayView subview(size_t offset) const { - return subview(offset, this->size()); - } -}; - -// Comparing two ArrayViews compares their (pointer,size) pairs; it does *not* -// dereference the pointers. -template -bool operator==(const ArrayView& a, const ArrayView& b) { - return a.data() == b.data() && a.size() == b.size(); -} -template -bool operator!=(const ArrayView& a, const ArrayView& b) { - return !(a == b); -} - -// Variable-size ArrayViews are the size of two pointers; fixed-size ArrayViews -// are the size of one pointer. (And as a special case, fixed-size ArrayViews -// of size 0 require no storage.) -static_assert(sizeof(ArrayView) == 2 * sizeof(int*), ""); -static_assert(sizeof(ArrayView) == sizeof(int*), ""); -static_assert(std::is_empty>::value, ""); - -template -inline ArrayView MakeArrayView(T* data, size_t size) { - return ArrayView(data, size); -} - -// Only for primitive types that have the same size and aligment. -// Allow reinterpret cast of the array view to another primitive type of the -// same size. -// Template arguments order is (U, T, Size) to allow deduction of the template -// arguments in client calls: reinterpret_array_view(array_view). -template -inline ArrayView reinterpret_array_view(ArrayView view) { - static_assert(sizeof(U) == sizeof(T) && alignof(U) == alignof(T), - "ArrayView reinterpret_cast is only supported for casting " - "between views that represent the same chunk of memory."); - static_assert( - std::is_fundamental::value && std::is_fundamental::value, - "ArrayView reinterpret_cast is only supported for casting between " - "fundamental types."); - return ArrayView(reinterpret_cast(view.data()), view.size()); -} - -#endif \ No newline at end of file diff --git a/src/common/common.h b/src/common/common.h index 42fb278..c24523e 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -2,6 +2,9 @@ #define _COMMON_H_ #include +#include +#include +#include int CommonDummy(); @@ -32,4 +35,54 @@ inline const std::string GetIceUsername(const std::string &sdp) { return result; } +// SSRCManager is used to manage the SSRCs that have been used. + +class SSRCManager { + public: + static SSRCManager &Instance() { + static SSRCManager instance; + return instance; + } + + void AddSsrc(uint32_t ssrc) { + std::lock_guard lock(mutex_); + ssrcs_.insert(ssrc); + } + + void DeleteSsrc(uint32_t ssrc) { + std::lock_guard lock(mutex_); + ssrcs_.erase(ssrc); + } + + bool Contains(uint32_t ssrc) { + std::lock_guard lock(mutex_); + return ssrcs_.count(ssrc) > 0; + } + + private: + SSRCManager() = default; + ~SSRCManager() = default; + SSRCManager(const SSRCManager &) = delete; + SSRCManager &operator=(const SSRCManager &) = delete; + + std::unordered_set ssrcs_; + std::mutex mutex_; +}; + +inline uint32_t GenerateRandomSSRC() { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dis(1, 0xFFFFFFFF); + return dis(gen); +} + +inline uint32_t GenerateUniqueSsrc() { + uint32_t new_ssrc; + do { + new_ssrc = GenerateRandomSSRC(); + } while (SSRCManager::Instance().Contains(new_ssrc)); + SSRCManager::Instance().AddSsrc(new_ssrc); + return new_ssrc; +} + #endif \ No newline at end of file diff --git a/src/qos/congestion_control_feedback.cpp b/src/qos/congestion_control_feedback.cpp index c7e95d3..f3949bf 100644 --- a/src/qos/congestion_control_feedback.cpp +++ b/src/qos/congestion_control_feedback.cpp @@ -6,10 +6,10 @@ #include #include -#include "array_view.h" #include "byte_io.h" #include "log.h" +// rfc8888 - RTP Congestion Control Feedback /* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |V=2|P| FMT=11 | PT = 205 | length | @@ -38,6 +38,23 @@ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ */ +// for this implementation, only one stream is supported. +/* + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P| FMT=11 | PT = 205 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of RTCP packet sender | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of 1st RTP Stream | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | begin_seq | num_reports | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |R|ECN| Arrival time offset | ... . + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Report Timestamp (32 bits) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ + namespace { constexpr size_t kSenderSsrcLength = 4; @@ -128,9 +145,9 @@ bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position, size_t max_length, PacketReadyCallback callback) const { // Ensure there is enough room for this packet. - while (*position + BlockLength() > max_length) { - if (!OnBufferFull(buffer, position, callback)) return false; - } + // while (*position + BlockLength() > max_length) { + // if (!OnBufferFull(buffer, position, callback)) return false; + // } const size_t position_end = *position + BlockLength(); // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -144,14 +161,14 @@ bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position, *position += 4; // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // | SSRC of nth RTP Stream | + // | SSRC of 1st RTP Stream | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // | begin_seq | num_reports | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // |R|ECN| Arrival time offset | ... . // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // . . - auto write_report_for_ssrc = [&](ArrayView packets) { + + auto write_report_for_ssrc = [&](std::vector packets) { // SSRC of nth RTP stream. ByteWriter::WriteBigEndian(&buffer[*position], packets[0].ssrc); *position += 4; @@ -201,18 +218,13 @@ bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position, return true; }; - ArrayView remaining(packets_); - while (!remaining.empty()) { + if (!packets_.empty()) { int number_of_packets_for_ssrc = 0; - uint32_t ssrc = remaining[0].ssrc; - for (const PacketInfo& packet_info : remaining) { - if (packet_info.ssrc != ssrc) { - break; - } + uint32_t ssrc = packets_[0].ssrc; + for (const PacketInfo& packet_info : packets_) { ++number_of_packets_for_ssrc; } - write_report_for_ssrc(remaining.subview(0, number_of_packets_for_ssrc)); - remaining = remaining.subview(number_of_packets_for_ssrc); + write_report_for_ssrc(packets_); } // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -245,15 +257,6 @@ size_t CongestionControlFeedback::BlockLength() const { uint32_t ssrc = packets_.front().ssrc; uint16_t first_sequence_number = packets_.front().sequence_number; - for (size_t i = 0; i < packets_.size(); ++i) { - if (packets_[i].ssrc != ssrc) { - uint16_t number_of_packets = - packets_[i - 1].sequence_number - first_sequence_number + 1; - total_size += increase_size_per_ssrc(number_of_packets); - ssrc = packets_[i].ssrc; - first_sequence_number = packets_[i].sequence_number; - } - } uint16_t number_of_packets = packets_.back().sequence_number - first_sequence_number + 1; total_size += increase_size_per_ssrc(number_of_packets); diff --git a/src/qos/congestion_control_feedback.h b/src/qos/congestion_control_feedback.h index 502fb33..fc3869c 100644 --- a/src/qos/congestion_control_feedback.h +++ b/src/qos/congestion_control_feedback.h @@ -12,7 +12,6 @@ #include #include -#include "array_view.h" #include "enc_mark.h" #include "rtp_feedback.h" @@ -40,7 +39,7 @@ class CongestionControlFeedback : public RtpFeedback { bool Parse(const RtcpCommonHeader& packet); - ArrayView packets() const { return packets_; } + std::vector packets() const { return packets_; } uint32_t report_timestamp_compact_ntp() const { return report_timestamp_compact_ntp_; diff --git a/src/rtcp/rtcp_sender/rtcp_sender.h b/src/rtcp/rtcp_sender/rtcp_sender.h new file mode 100644 index 0000000..1800ebc --- /dev/null +++ b/src/rtcp/rtcp_sender/rtcp_sender.h @@ -0,0 +1,53 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-01-10 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _RTCP_SENDER_H_ +#define _RTCP_SENDER_H_ + +#define IP_PACKET_SIZE 1500 + +#include +#include + +#include "log.h" + +class RTCPSender { + public: + RTCPSender(std::function callback, + size_t max_packet_size) + : callback_(callback), max_packet_size_(max_packet_size) { + if (max_packet_size > IP_PACKET_SIZE) { + LOG_ERROR("max_packet_size must be less than IP_PACKET_SIZE"); + } + } + ~RTCPSender() { + if (index_ == 0) { + LOG_ERROR("Unsent rtcp packet"); + } + } + + // Appends a packet to pending compound packet. + // Sends rtcp packet if buffer is full and resets the buffer. + void AppendPacket(const RtcpPacket& packet) { + packet.Create(buffer_, &index_, max_packet_size_, callback_); + } + + // Sends pending rtcp packet. + void Send() { + if (index_ > 0 && callback_) { + callback_(buffer_, index_); + index_ = 0; + } + } + + private: + std::function callback_ = nullptr; + const size_t max_packet_size_; + size_t index_ = 0; + uint8_t buffer_[IP_PACKET_SIZE]; +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_packet/rtp_codec.cpp b/src/rtp/rtp_packet/rtp_codec.cpp index 5cef668..968217d 100644 --- a/src/rtp/rtp_packet/rtp_codec.cpp +++ b/src/rtp/rtp_packet/rtp_codec.cpp @@ -1,8 +1,8 @@ #include "rtp_codec.h" #include -#include +#include "common.h" #include "log.h" #include "obu_parser.h" @@ -15,22 +15,6 @@ constexpr int kObuTypeSequenceHeader = 1; using namespace obu; -uint32_t GenerateRandomSSRC() { - std::random_device rd; - std::mt19937 gen(rd()); - std::uniform_int_distribution dis(1, 0xFFFFFFFF); - return dis(gen); -} - -uint32_t GenerateUniqueSsrc() { - uint32_t new_ssrc; - do { - new_ssrc = GenerateRandomSSRC(); - } while (SSRCManager::Instance().Contains(new_ssrc)); - SSRCManager::Instance().AddSsrc(new_ssrc); - return new_ssrc; -} - RtpCodec::RtpCodec(RtpPacket::PAYLOAD_TYPE payload_type) : version_(RTP_VERSION), has_padding_(false), diff --git a/src/rtp/rtp_packet/rtp_codec.h b/src/rtp/rtp_packet/rtp_codec.h index 027ed27..f61ea26 100644 --- a/src/rtp/rtp_packet/rtp_codec.h +++ b/src/rtp/rtp_packet/rtp_codec.h @@ -3,7 +3,6 @@ #include #include -#include #include #include "fec_encoder.h" @@ -56,36 +55,4 @@ class RtpCodec { FecEncoder fec_encoder_; }; -class SSRCManager { - public: - static SSRCManager& Instance() { - static SSRCManager instance; - return instance; - } - - void AddSsrc(uint32_t ssrc) { - std::lock_guard lock(mutex_); - ssrcs_.insert(ssrc); - } - - void DeleteSsrc(uint32_t ssrc) { - std::lock_guard lock(mutex_); - ssrcs_.erase(ssrc); - } - - bool Contains(uint32_t ssrc) { - std::lock_guard lock(mutex_); - return ssrcs_.count(ssrc) > 0; - } - - private: - SSRCManager() = default; - ~SSRCManager() = default; - SSRCManager(const SSRCManager&) = delete; - SSRCManager& operator=(const SSRCManager&) = delete; - - std::unordered_set ssrcs_; - std::mutex mutex_; -}; - #endif \ No newline at end of file diff --git a/xmake.lua b/xmake.lua index 3e77c30..e8a83c4 100644 --- a/xmake.lua +++ b/xmake.lua @@ -114,6 +114,7 @@ target("rtcp") "src/rtcp/rtp_feedback/*.cpp") add_includedirs("src/rtcp", "src/rtcp/rtcp_packet", + "src/rtcp/rtcp_sender", "src/rtcp/rtp_feedback", {public = true}) target("qos")