[feat] congestion control feedback sending support

This commit is contained in:
dijunkun
2025-01-10 17:21:03 +08:00
parent 49b74ffcd6
commit 63ed77e43a
10 changed files with 159 additions and 342 deletions

View File

@@ -1,18 +1,22 @@
#include "rtp_video_receiver.h"
#include "common.h"
#include "log.h"
#include "rtcp_sender.h"
#define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2)
#define RTCP_RR_INTERVAL 1000
RtpVideoReceiver::RtpVideoReceiver()
: receive_side_congestion_controller_(
: feedback_ssrc_(GenerateUniqueSsrc()),
receive_side_congestion_controller_(
[this](std::vector<std::unique_ptr<RtcpPacket>> packets) {
SendCombinedRtcpPacket(std::move(packets));
}) {}
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics),
feedback_ssrc_(GenerateUniqueSsrc()),
receive_side_congestion_controller_(
[this](std::vector<std::unique_ptr<RtcpPacket>> packets) {
SendCombinedRtcpPacket(std::move(packets));
@@ -31,6 +35,8 @@ RtpVideoReceiver::~RtpVideoReceiver() {
if (rtcp_thread_.joinable()) {
rtcp_thread_.join();
}
SSRCManager::Instance().DeleteSsrc(feedback_ssrc_);
}
void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
@@ -347,17 +353,23 @@ int RtpVideoReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) {
}
void RtpVideoReceiver::SendCombinedRtcpPacket(
std::vector<std::unique_ptr<RtcpPacket>> packets) {
std::vector<std::unique_ptr<RtcpPacket>> rtcp_packets) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
}
LOG_ERROR("Send combined rtcp packet");
// LOG_ERROR("Send combined rtcp packet");
for (auto& packet : packets) {
if (data_send_func_((const char*)packet->Buffer(), packet->Size())) {
LOG_ERROR("Send CCB failed");
}
RTCPSender rtcp_sender(
[this](const uint8_t* buffer, size_t size) -> int {
return data_send_func_((const char*)buffer, size);
},
IP_PACKET_SIZE);
for (auto& rtcp_packet : rtcp_packets) {
rtcp_packet->SetSenderSsrc(feedback_ssrc_);
rtcp_sender.AppendPacket(*rtcp_packet);
rtcp_sender.Send();
}
}

View File

@@ -44,7 +44,8 @@ class RtpVideoReceiver : public ThreadBase {
bool CheckIsTimeSendRR();
int SendRtcpRR(RtcpReceiverReport& rtcp_rr);
void SendCombinedRtcpPacket(std::vector<std::unique_ptr<RtcpPacket>> packets);
void SendCombinedRtcpPacket(
std::vector<std::unique_ptr<RtcpPacket>> rtcp_packets);
private:
bool Process() override;
@@ -89,6 +90,7 @@ class RtpVideoReceiver : public ThreadBase {
private:
ReceiveSideCongestionController receive_side_congestion_controller_;
uint32_t feedback_ssrc_ = 0;
};
#endif

View File

@@ -1,257 +0,0 @@
/*
* @Author: DI JUNKUN
* @Date: 2024-12-18
* Copyright 2018 The WebRTC project authors. All Rights Reserved.
*/
#ifndef _ARRAY_VIEW_H_
#define _ARRAY_VIEW_H_
#include <algorithm>
#include <array>
#include <cstddef>
#include <iterator>
#include <type_traits>
#include "type_traits.h"
namespace array_view_internal {
// Magic constant for indicating that the size of an ArrayView is variable
// instead of fixed.
enum : std::ptrdiff_t { kArrayViewVarSize = -4711 };
// Base class for ArrayViews of fixed nonzero size.
template <typename T, std::ptrdiff_t Size>
class ArrayViewBase {
static_assert(Size > 0, "ArrayView size must be variable or non-negative");
public:
ArrayViewBase(T* data, size_t /* size */) : data_(data) {}
static constexpr size_t size() { return Size; }
static constexpr bool empty() { return false; }
T* data() const { return data_; }
protected:
static constexpr bool fixed_size() { return true; }
private:
T* data_;
};
// Specialized base class for ArrayViews of fixed zero size.
template <typename T>
class ArrayViewBase<T, 0> {
public:
explicit ArrayViewBase(T* /* data */, size_t /* size */) {}
static constexpr size_t size() { return 0; }
static constexpr bool empty() { return true; }
T* data() const { return nullptr; }
protected:
static constexpr bool fixed_size() { return true; }
};
// Specialized base class for ArrayViews of variable size.
template <typename T>
class ArrayViewBase<T, array_view_internal::kArrayViewVarSize> {
public:
ArrayViewBase(T* data, size_t size)
: data_(size == 0 ? nullptr : data), size_(size) {}
size_t size() const { return size_; }
bool empty() const { return size_ == 0; }
T* data() const { return data_; }
protected:
static constexpr bool fixed_size() { return false; }
private:
T* data_;
size_t size_;
};
} // namespace array_view_internal
template <typename T,
std::ptrdiff_t Size = array_view_internal::kArrayViewVarSize>
class ArrayView final : public array_view_internal::ArrayViewBase<T, Size> {
public:
using value_type = T;
using reference = value_type&;
using const_reference = const value_type&;
using pointer = value_type*;
using const_pointer = const value_type*;
using const_iterator = const T*;
// Construct an ArrayView from a pointer and a length.
template <typename U>
ArrayView(U* data, size_t size)
: array_view_internal::ArrayViewBase<T, Size>::ArrayViewBase(data, size) {
// RTC_DCHECK_EQ(size == 0 ? nullptr : data, this->data());
// RTC_DCHECK_EQ(size, this->size());
// RTC_DCHECK_EQ(!this->data(),
// this->size() == 0); // data is null iff size == 0.
}
// Construct an empty ArrayView. Note that fixed-size ArrayViews of size > 0
// cannot be empty.
ArrayView() : ArrayView(nullptr, 0) {}
ArrayView(std::nullptr_t) // NOLINT
: ArrayView() {}
ArrayView(std::nullptr_t, size_t size)
: ArrayView(static_cast<T*>(nullptr), size) {
static_assert(Size == 0 || Size == array_view_internal::kArrayViewVarSize,
"");
// RTC_DCHECK_EQ(0, size);
}
// Construct an ArrayView from a C-style array.
template <typename U, size_t N>
ArrayView(U (&array)[N]) // NOLINT
: ArrayView(array, N) {
static_assert(Size == N || Size == array_view_internal::kArrayViewVarSize,
"Array size must match ArrayView size");
}
// (Only if size is fixed.) Construct a fixed size ArrayView<T, N> from a
// non-const std::array instance. For an ArrayView with variable size, the
// used ctor is ArrayView(U& u) instead.
template <typename U, size_t N,
typename std::enable_if<
Size == static_cast<std::ptrdiff_t>(N)>::type* = nullptr>
ArrayView(std::array<U, N>& u) // NOLINT
: ArrayView(u.data(), u.size()) {}
// (Only if size is fixed.) Construct a fixed size ArrayView<T, N> where T is
// const from a const(expr) std::array instance. For an ArrayView with
// variable size, the used ctor is ArrayView(U& u) instead.
template <typename U, size_t N,
typename std::enable_if<
Size == static_cast<std::ptrdiff_t>(N)>::type* = nullptr>
ArrayView(const std::array<U, N>& u) // NOLINT
: ArrayView(u.data(), u.size()) {}
// (Only if size is fixed.) Construct an ArrayView from any type U that has a
// static constexpr size() method whose return value is equal to Size, and a
// data() method whose return value converts implicitly to T*. In particular,
// this means we allow conversion from ArrayView<T, N> to ArrayView<const T,
// N>, but not the other way around. We also don't allow conversion from
// ArrayView<T> to ArrayView<T, N>, or from ArrayView<T, M> to ArrayView<T,
// N> when M != N.
template <typename U, typename std::enable_if<
Size != array_view_internal::kArrayViewVarSize &&
HasDataAndSize<U, T>::value>::type* = nullptr>
ArrayView(U& u) // NOLINT
: ArrayView(u.data(), u.size()) {
static_assert(U::size() == Size, "Sizes must match exactly");
}
template <typename U, typename std::enable_if<
Size != array_view_internal::kArrayViewVarSize &&
HasDataAndSize<U, T>::value>::type* = nullptr>
ArrayView(const U& u) // NOLINT(runtime/explicit)
: ArrayView(u.data(), u.size()) {
static_assert(U::size() == Size, "Sizes must match exactly");
}
// (Only if size is variable.) Construct an ArrayView from any type U that
// has a size() method whose return value converts implicitly to size_t, and
// a data() method whose return value converts implicitly to T*. In
// particular, this means we allow conversion from ArrayView<T> to
// ArrayView<const T>, but not the other way around. Other allowed
// conversions include
// ArrayView<T, N> to ArrayView<T> or ArrayView<const T>,
// std::vector<T> to ArrayView<T> or ArrayView<const T>,
// const std::vector<T> to ArrayView<const T>,
// rtc::Buffer to ArrayView<uint8_t> or ArrayView<const uint8_t>, and
// const rtc::Buffer to ArrayView<const uint8_t>.
template <typename U, typename std::enable_if<
Size == array_view_internal::kArrayViewVarSize &&
HasDataAndSize<U, T>::value>::type* = nullptr>
ArrayView(U& u) // NOLINT
: ArrayView(u.data(), u.size()) {}
template <typename U, typename std::enable_if<
Size == array_view_internal::kArrayViewVarSize &&
HasDataAndSize<U, T>::value>::type* = nullptr>
ArrayView(const U& u) // NOLINT(runtime/explicit)
: ArrayView(u.data(), u.size()) {}
// Indexing and iteration. These allow mutation even if the ArrayView is
// const, because the ArrayView doesn't own the array. (To prevent mutation,
// use a const element type.)
T& operator[](size_t idx) const {
// RTC_DCHECK_LT(idx, this->size());
// RTC_DCHECK(this->data());
return this->data()[idx];
}
T* begin() const { return this->data(); }
T* end() const { return this->data() + this->size(); }
const T* cbegin() const { return this->data(); }
const T* cend() const { return this->data() + this->size(); }
std::reverse_iterator<T*> rbegin() const {
return std::make_reverse_iterator(end());
}
std::reverse_iterator<T*> rend() const {
return std::make_reverse_iterator(begin());
}
std::reverse_iterator<const T*> crbegin() const {
return std::make_reverse_iterator(cend());
}
std::reverse_iterator<const T*> crend() const {
return std::make_reverse_iterator(cbegin());
}
ArrayView<T> subview(size_t offset, size_t size) const {
return offset < this->size()
? ArrayView<T>(this->data() + offset,
std::min(size, this->size() - offset))
: ArrayView<T>();
}
ArrayView<T> subview(size_t offset) const {
return subview(offset, this->size());
}
};
// Comparing two ArrayViews compares their (pointer,size) pairs; it does *not*
// dereference the pointers.
template <typename T, std::ptrdiff_t Size1, std::ptrdiff_t Size2>
bool operator==(const ArrayView<T, Size1>& a, const ArrayView<T, Size2>& b) {
return a.data() == b.data() && a.size() == b.size();
}
template <typename T, std::ptrdiff_t Size1, std::ptrdiff_t Size2>
bool operator!=(const ArrayView<T, Size1>& a, const ArrayView<T, Size2>& b) {
return !(a == b);
}
// Variable-size ArrayViews are the size of two pointers; fixed-size ArrayViews
// are the size of one pointer. (And as a special case, fixed-size ArrayViews
// of size 0 require no storage.)
static_assert(sizeof(ArrayView<int>) == 2 * sizeof(int*), "");
static_assert(sizeof(ArrayView<int, 17>) == sizeof(int*), "");
static_assert(std::is_empty<ArrayView<int, 0>>::value, "");
template <typename T>
inline ArrayView<T> MakeArrayView(T* data, size_t size) {
return ArrayView<T>(data, size);
}
// Only for primitive types that have the same size and aligment.
// Allow reinterpret cast of the array view to another primitive type of the
// same size.
// Template arguments order is (U, T, Size) to allow deduction of the template
// arguments in client calls: reinterpret_array_view<target_type>(array_view).
template <typename U, typename T, std::ptrdiff_t Size>
inline ArrayView<U, Size> reinterpret_array_view(ArrayView<T, Size> view) {
static_assert(sizeof(U) == sizeof(T) && alignof(U) == alignof(T),
"ArrayView reinterpret_cast is only supported for casting "
"between views that represent the same chunk of memory.");
static_assert(
std::is_fundamental<T>::value && std::is_fundamental<U>::value,
"ArrayView reinterpret_cast is only supported for casting between "
"fundamental types.");
return ArrayView<U, Size>(reinterpret_cast<U*>(view.data()), view.size());
}
#endif

View File

@@ -2,6 +2,9 @@
#define _COMMON_H_
#include <iostream>
#include <mutex>
#include <random>
#include <unordered_set>
int CommonDummy();
@@ -32,4 +35,54 @@ inline const std::string GetIceUsername(const std::string &sdp) {
return result;
}
// SSRCManager is used to manage the SSRCs that have been used.
class SSRCManager {
public:
static SSRCManager &Instance() {
static SSRCManager instance;
return instance;
}
void AddSsrc(uint32_t ssrc) {
std::lock_guard<std::mutex> lock(mutex_);
ssrcs_.insert(ssrc);
}
void DeleteSsrc(uint32_t ssrc) {
std::lock_guard<std::mutex> lock(mutex_);
ssrcs_.erase(ssrc);
}
bool Contains(uint32_t ssrc) {
std::lock_guard<std::mutex> lock(mutex_);
return ssrcs_.count(ssrc) > 0;
}
private:
SSRCManager() = default;
~SSRCManager() = default;
SSRCManager(const SSRCManager &) = delete;
SSRCManager &operator=(const SSRCManager &) = delete;
std::unordered_set<uint32_t> ssrcs_;
std::mutex mutex_;
};
inline uint32_t GenerateRandomSSRC() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<uint32_t> dis(1, 0xFFFFFFFF);
return dis(gen);
}
inline uint32_t GenerateUniqueSsrc() {
uint32_t new_ssrc;
do {
new_ssrc = GenerateRandomSSRC();
} while (SSRCManager::Instance().Contains(new_ssrc));
SSRCManager::Instance().AddSsrc(new_ssrc);
return new_ssrc;
}
#endif

View File

@@ -6,10 +6,10 @@
#include <utility>
#include <vector>
#include "array_view.h"
#include "byte_io.h"
#include "log.h"
// rfc8888 - RTP Congestion Control Feedback
/*
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P| FMT=11 | PT = 205 | length |
@@ -38,6 +38,23 @@
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
// for this implementation, only one stream is supported.
/*
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P| FMT=11 | PT = 205 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of RTCP packet sender |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of 1st RTP Stream |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| begin_seq | num_reports |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|R|ECN| Arrival time offset | ... .
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Report Timestamp (32 bits) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
namespace {
constexpr size_t kSenderSsrcLength = 4;
@@ -128,9 +145,9 @@ bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position,
size_t max_length,
PacketReadyCallback callback) const {
// Ensure there is enough room for this packet.
while (*position + BlockLength() > max_length) {
if (!OnBufferFull(buffer, position, callback)) return false;
}
// while (*position + BlockLength() > max_length) {
// if (!OnBufferFull(buffer, position, callback)) return false;
// }
const size_t position_end = *position + BlockLength();
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
@@ -144,14 +161,14 @@ bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position,
*position += 4;
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | SSRC of nth RTP Stream |
// | SSRC of 1st RTP Stream |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | begin_seq | num_reports |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |R|ECN| Arrival time offset | ... .
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// . .
auto write_report_for_ssrc = [&](ArrayView<const PacketInfo> packets) {
auto write_report_for_ssrc = [&](std::vector<PacketInfo> packets) {
// SSRC of nth RTP stream.
ByteWriter<uint32_t>::WriteBigEndian(&buffer[*position], packets[0].ssrc);
*position += 4;
@@ -201,18 +218,13 @@ bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position,
return true;
};
ArrayView<const PacketInfo> remaining(packets_);
while (!remaining.empty()) {
if (!packets_.empty()) {
int number_of_packets_for_ssrc = 0;
uint32_t ssrc = remaining[0].ssrc;
for (const PacketInfo& packet_info : remaining) {
if (packet_info.ssrc != ssrc) {
break;
}
uint32_t ssrc = packets_[0].ssrc;
for (const PacketInfo& packet_info : packets_) {
++number_of_packets_for_ssrc;
}
write_report_for_ssrc(remaining.subview(0, number_of_packets_for_ssrc));
remaining = remaining.subview(number_of_packets_for_ssrc);
write_report_for_ssrc(packets_);
}
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
@@ -245,15 +257,6 @@ size_t CongestionControlFeedback::BlockLength() const {
uint32_t ssrc = packets_.front().ssrc;
uint16_t first_sequence_number = packets_.front().sequence_number;
for (size_t i = 0; i < packets_.size(); ++i) {
if (packets_[i].ssrc != ssrc) {
uint16_t number_of_packets =
packets_[i - 1].sequence_number - first_sequence_number + 1;
total_size += increase_size_per_ssrc(number_of_packets);
ssrc = packets_[i].ssrc;
first_sequence_number = packets_[i].sequence_number;
}
}
uint16_t number_of_packets =
packets_.back().sequence_number - first_sequence_number + 1;
total_size += increase_size_per_ssrc(number_of_packets);

View File

@@ -12,7 +12,6 @@
#include <limits>
#include <vector>
#include "array_view.h"
#include "enc_mark.h"
#include "rtp_feedback.h"
@@ -40,7 +39,7 @@ class CongestionControlFeedback : public RtpFeedback {
bool Parse(const RtcpCommonHeader& packet);
ArrayView<const PacketInfo> packets() const { return packets_; }
std::vector<PacketInfo> packets() const { return packets_; }
uint32_t report_timestamp_compact_ntp() const {
return report_timestamp_compact_ntp_;

View File

@@ -0,0 +1,53 @@
/*
* @Author: DI JUNKUN
* @Date: 2025-01-10
* Copyright (c) 2025 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _RTCP_SENDER_H_
#define _RTCP_SENDER_H_
#define IP_PACKET_SIZE 1500
#include <functional>
#include <vector>
#include "log.h"
class RTCPSender {
public:
RTCPSender(std::function<int(const uint8_t*, size_t)> callback,
size_t max_packet_size)
: callback_(callback), max_packet_size_(max_packet_size) {
if (max_packet_size > IP_PACKET_SIZE) {
LOG_ERROR("max_packet_size must be less than IP_PACKET_SIZE");
}
}
~RTCPSender() {
if (index_ == 0) {
LOG_ERROR("Unsent rtcp packet");
}
}
// Appends a packet to pending compound packet.
// Sends rtcp packet if buffer is full and resets the buffer.
void AppendPacket(const RtcpPacket& packet) {
packet.Create(buffer_, &index_, max_packet_size_, callback_);
}
// Sends pending rtcp packet.
void Send() {
if (index_ > 0 && callback_) {
callback_(buffer_, index_);
index_ = 0;
}
}
private:
std::function<int(const uint8_t*, size_t)> callback_ = nullptr;
const size_t max_packet_size_;
size_t index_ = 0;
uint8_t buffer_[IP_PACKET_SIZE];
};
#endif

View File

@@ -1,8 +1,8 @@
#include "rtp_codec.h"
#include <chrono>
#include <random>
#include "common.h"
#include "log.h"
#include "obu_parser.h"
@@ -15,22 +15,6 @@ constexpr int kObuTypeSequenceHeader = 1;
using namespace obu;
uint32_t GenerateRandomSSRC() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<uint32_t> dis(1, 0xFFFFFFFF);
return dis(gen);
}
uint32_t GenerateUniqueSsrc() {
uint32_t new_ssrc;
do {
new_ssrc = GenerateRandomSSRC();
} while (SSRCManager::Instance().Contains(new_ssrc));
SSRCManager::Instance().AddSsrc(new_ssrc);
return new_ssrc;
}
RtpCodec::RtpCodec(RtpPacket::PAYLOAD_TYPE payload_type)
: version_(RTP_VERSION),
has_padding_(false),

View File

@@ -3,7 +3,6 @@
#include <cstddef>
#include <cstdint>
#include <unordered_set>
#include <vector>
#include "fec_encoder.h"
@@ -56,36 +55,4 @@ class RtpCodec {
FecEncoder fec_encoder_;
};
class SSRCManager {
public:
static SSRCManager& Instance() {
static SSRCManager instance;
return instance;
}
void AddSsrc(uint32_t ssrc) {
std::lock_guard<std::mutex> lock(mutex_);
ssrcs_.insert(ssrc);
}
void DeleteSsrc(uint32_t ssrc) {
std::lock_guard<std::mutex> lock(mutex_);
ssrcs_.erase(ssrc);
}
bool Contains(uint32_t ssrc) {
std::lock_guard<std::mutex> lock(mutex_);
return ssrcs_.count(ssrc) > 0;
}
private:
SSRCManager() = default;
~SSRCManager() = default;
SSRCManager(const SSRCManager&) = delete;
SSRCManager& operator=(const SSRCManager&) = delete;
std::unordered_set<uint32_t> ssrcs_;
std::mutex mutex_;
};
#endif