From 2512e1eb15296f27c6c8619b3a7d63af473b531a Mon Sep 17 00:00:00 2001 From: dijunkun Date: Wed, 18 Dec 2024 17:27:42 +0800 Subject: [PATCH] [feat] update congestion control feedback --- src/common/array_view.h | 257 +++++++++++++++ src/{rtcp => common}/byte_io.h | 0 src/common/type_traits.h | 141 ++++++++ src/qos/congestion_control_feedback.cc | 302 ++++++++++++++++++ src/qos/congestion_control_feedback.h | 78 +++++ .../congestion_control_feedback_generator.cc | 108 +++++++ .../congestion_control_feedback_generator.h | 59 ++++ .../congestion_control_feedback_tracker.cc | 87 +++++ src/qos/congestion_control_feedback_tracker.h | 43 +++ src/qos/receive_side_congestion_controller.cc | 156 +++++++++ src/qos/receive_side_congestion_controller.h | 69 ++++ src/rtcp/{ => rtcp_packet}/rtcp_packet.cpp | 0 src/rtcp/{ => rtcp_packet}/rtcp_packet.h | 0 src/rtcp/rtp_feedback/rtp_feedback.cpp | 29 ++ src/rtcp/rtp_feedback/rtp_feedback.h | 37 +++ src/rtp/rtp_packet/rtp_packet_received.cpp | 14 + src/rtp/rtp_packet/rtp_packet_received.h | 31 ++ xmake.lua | 8 +- 18 files changed, 1417 insertions(+), 2 deletions(-) create mode 100644 src/common/array_view.h rename src/{rtcp => common}/byte_io.h (100%) create mode 100644 src/common/type_traits.h create mode 100644 src/qos/congestion_control_feedback.cc create mode 100644 src/qos/congestion_control_feedback.h create mode 100644 src/qos/congestion_control_feedback_generator.cc create mode 100644 src/qos/congestion_control_feedback_generator.h create mode 100644 src/qos/congestion_control_feedback_tracker.cc create mode 100644 src/qos/congestion_control_feedback_tracker.h create mode 100644 src/qos/receive_side_congestion_controller.cc create mode 100644 src/qos/receive_side_congestion_controller.h rename src/rtcp/{ => rtcp_packet}/rtcp_packet.cpp (100%) rename src/rtcp/{ => rtcp_packet}/rtcp_packet.h (100%) create mode 100644 src/rtcp/rtp_feedback/rtp_feedback.cpp create mode 100644 src/rtcp/rtp_feedback/rtp_feedback.h create mode 100644 src/rtp/rtp_packet/rtp_packet_received.cpp create mode 100644 src/rtp/rtp_packet/rtp_packet_received.h diff --git a/src/common/array_view.h b/src/common/array_view.h new file mode 100644 index 0000000..1752f75 --- /dev/null +++ b/src/common/array_view.h @@ -0,0 +1,257 @@ +/* + * @Author: DI JUNKUN + * @Date: 2024-12-18 + * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _ARRAY_VIEW_H_ +#define _ARRAY_VIEW_H_ + +#include +#include +#include +#include +#include + +#include "type_traits.h" + +namespace array_view_internal { + +// Magic constant for indicating that the size of an ArrayView is variable +// instead of fixed. +enum : std::ptrdiff_t { kArrayViewVarSize = -4711 }; + +// Base class for ArrayViews of fixed nonzero size. +template +class ArrayViewBase { + static_assert(Size > 0, "ArrayView size must be variable or non-negative"); + + public: + ArrayViewBase(T* data, size_t /* size */) : data_(data) {} + + static constexpr size_t size() { return Size; } + static constexpr bool empty() { return false; } + T* data() const { return data_; } + + protected: + static constexpr bool fixed_size() { return true; } + + private: + T* data_; +}; + +// Specialized base class for ArrayViews of fixed zero size. +template +class ArrayViewBase { + public: + explicit ArrayViewBase(T* /* data */, size_t /* size */) {} + + static constexpr size_t size() { return 0; } + static constexpr bool empty() { return true; } + T* data() const { return nullptr; } + + protected: + static constexpr bool fixed_size() { return true; } +}; + +// Specialized base class for ArrayViews of variable size. +template +class ArrayViewBase { + public: + ArrayViewBase(T* data, size_t size) + : data_(size == 0 ? nullptr : data), size_(size) {} + + size_t size() const { return size_; } + bool empty() const { return size_ == 0; } + T* data() const { return data_; } + + protected: + static constexpr bool fixed_size() { return false; } + + private: + T* data_; + size_t size_; +}; + +} // namespace array_view_internal + +template +class ArrayView final : public array_view_internal::ArrayViewBase { + public: + using value_type = T; + using reference = value_type&; + using const_reference = const value_type&; + using pointer = value_type*; + using const_pointer = const value_type*; + using const_iterator = const T*; + + // Construct an ArrayView from a pointer and a length. + template + ArrayView(U* data, size_t size) + : array_view_internal::ArrayViewBase::ArrayViewBase(data, size) { + RTC_DCHECK_EQ(size == 0 ? nullptr : data, this->data()); + RTC_DCHECK_EQ(size, this->size()); + RTC_DCHECK_EQ(!this->data(), + this->size() == 0); // data is null iff size == 0. + } + + // Construct an empty ArrayView. Note that fixed-size ArrayViews of size > 0 + // cannot be empty. + ArrayView() : ArrayView(nullptr, 0) {} + ArrayView(std::nullptr_t) // NOLINT + : ArrayView() {} + ArrayView(std::nullptr_t, size_t size) + : ArrayView(static_cast(nullptr), size) { + static_assert(Size == 0 || Size == array_view_internal::kArrayViewVarSize, + ""); + RTC_DCHECK_EQ(0, size); + } + + // Construct an ArrayView from a C-style array. + template + ArrayView(U (&array)[N]) // NOLINT + : ArrayView(array, N) { + static_assert(Size == N || Size == array_view_internal::kArrayViewVarSize, + "Array size must match ArrayView size"); + } + + // (Only if size is fixed.) Construct a fixed size ArrayView from a + // non-const std::array instance. For an ArrayView with variable size, the + // used ctor is ArrayView(U& u) instead. + template (N)>::type* = nullptr> + ArrayView(std::array& u) // NOLINT + : ArrayView(u.data(), u.size()) {} + + // (Only if size is fixed.) Construct a fixed size ArrayView where T is + // const from a const(expr) std::array instance. For an ArrayView with + // variable size, the used ctor is ArrayView(U& u) instead. + template (N)>::type* = nullptr> + ArrayView(const std::array& u) // NOLINT + : ArrayView(u.data(), u.size()) {} + + // (Only if size is fixed.) Construct an ArrayView from any type U that has a + // static constexpr size() method whose return value is equal to Size, and a + // data() method whose return value converts implicitly to T*. In particular, + // this means we allow conversion from ArrayView to ArrayView, but not the other way around. We also don't allow conversion from + // ArrayView to ArrayView, or from ArrayView to ArrayView when M != N. + template ::value>::type* = nullptr> + ArrayView(U& u) // NOLINT + : ArrayView(u.data(), u.size()) { + static_assert(U::size() == Size, "Sizes must match exactly"); + } + template ::value>::type* = nullptr> + ArrayView(const U& u) // NOLINT(runtime/explicit) + : ArrayView(u.data(), u.size()) { + static_assert(U::size() == Size, "Sizes must match exactly"); + } + + // (Only if size is variable.) Construct an ArrayView from any type U that + // has a size() method whose return value converts implicitly to size_t, and + // a data() method whose return value converts implicitly to T*. In + // particular, this means we allow conversion from ArrayView to + // ArrayView, but not the other way around. Other allowed + // conversions include + // ArrayView to ArrayView or ArrayView, + // std::vector to ArrayView or ArrayView, + // const std::vector to ArrayView, + // rtc::Buffer to ArrayView or ArrayView, and + // const rtc::Buffer to ArrayView. + template ::value>::type* = nullptr> + ArrayView(U& u) // NOLINT + : ArrayView(u.data(), u.size()) {} + template ::value>::type* = nullptr> + ArrayView(const U& u) // NOLINT(runtime/explicit) + : ArrayView(u.data(), u.size()) {} + + // Indexing and iteration. These allow mutation even if the ArrayView is + // const, because the ArrayView doesn't own the array. (To prevent mutation, + // use a const element type.) + T& operator[](size_t idx) const { + RTC_DCHECK_LT(idx, this->size()); + RTC_DCHECK(this->data()); + return this->data()[idx]; + } + T* begin() const { return this->data(); } + T* end() const { return this->data() + this->size(); } + const T* cbegin() const { return this->data(); } + const T* cend() const { return this->data() + this->size(); } + std::reverse_iterator rbegin() const { + return std::make_reverse_iterator(end()); + } + std::reverse_iterator rend() const { + return std::make_reverse_iterator(begin()); + } + std::reverse_iterator crbegin() const { + return std::make_reverse_iterator(cend()); + } + std::reverse_iterator crend() const { + return std::make_reverse_iterator(cbegin()); + } + + ArrayView subview(size_t offset, size_t size) const { + return offset < this->size() + ? ArrayView(this->data() + offset, + std::min(size, this->size() - offset)) + : ArrayView(); + } + ArrayView subview(size_t offset) const { + return subview(offset, this->size()); + } +}; + +// Comparing two ArrayViews compares their (pointer,size) pairs; it does *not* +// dereference the pointers. +template +bool operator==(const ArrayView& a, const ArrayView& b) { + return a.data() == b.data() && a.size() == b.size(); +} +template +bool operator!=(const ArrayView& a, const ArrayView& b) { + return !(a == b); +} + +// Variable-size ArrayViews are the size of two pointers; fixed-size ArrayViews +// are the size of one pointer. (And as a special case, fixed-size ArrayViews +// of size 0 require no storage.) +static_assert(sizeof(ArrayView) == 2 * sizeof(int*), ""); +static_assert(sizeof(ArrayView) == sizeof(int*), ""); +static_assert(std::is_empty>::value, ""); + +template +inline ArrayView MakeArrayView(T* data, size_t size) { + return ArrayView(data, size); +} + +// Only for primitive types that have the same size and aligment. +// Allow reinterpret cast of the array view to another primitive type of the +// same size. +// Template arguments order is (U, T, Size) to allow deduction of the template +// arguments in client calls: reinterpret_array_view(array_view). +template +inline ArrayView reinterpret_array_view(ArrayView view) { + static_assert(sizeof(U) == sizeof(T) && alignof(U) == alignof(T), + "ArrayView reinterpret_cast is only supported for casting " + "between views that represent the same chunk of memory."); + static_assert( + std::is_fundamental::value && std::is_fundamental::value, + "ArrayView reinterpret_cast is only supported for casting between " + "fundamental types."); + return ArrayView(reinterpret_cast(view.data()), view.size()); +} + +#endif \ No newline at end of file diff --git a/src/rtcp/byte_io.h b/src/common/byte_io.h similarity index 100% rename from src/rtcp/byte_io.h rename to src/common/byte_io.h diff --git a/src/common/type_traits.h b/src/common/type_traits.h new file mode 100644 index 0000000..8f90a88 --- /dev/null +++ b/src/common/type_traits.h @@ -0,0 +1,141 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_TYPE_TRAITS_H_ +#define RTC_BASE_TYPE_TRAITS_H_ + +#include +#include +#include + +namespace rtc { + +// Determines if the given class has zero-argument .data() and .size() methods +// whose return values are convertible to T* and size_t, respectively. +template +class HasDataAndSize { + private: + template < + typename C, + typename std::enable_if< + std::is_convertible().data()), T*>::value && + std::is_convertible().size()), + std::size_t>::value>::type* = nullptr> + static int Test(int); + + template + static char Test(...); + + public: + static constexpr bool value = std::is_same(0)), int>::value; +}; + +namespace test_has_data_and_size { + +template +struct Test1 { + DR data(); + SR size(); +}; +static_assert(HasDataAndSize, int>::value, ""); +static_assert(HasDataAndSize, const int>::value, ""); +static_assert(HasDataAndSize, const int>::value, ""); +static_assert(!HasDataAndSize, int>::value, + "implicit cast of const int* to int*"); +static_assert(!HasDataAndSize, int>::value, + "implicit cast of char* to int*"); + +struct Test2 { + int* data; + size_t size; +}; +static_assert(!HasDataAndSize::value, + ".data and .size aren't functions"); + +struct Test3 { + int* data(); +}; +static_assert(!HasDataAndSize::value, ".size() is missing"); + +class Test4 { + int* data(); + size_t size(); +}; +static_assert(!HasDataAndSize::value, + ".data() and .size() are private"); + +} // namespace test_has_data_and_size + +namespace type_traits_impl { + +// Determines if the given type is an enum that converts implicitly to +// an integral type. +template +struct IsIntEnum { + private: + // This overload is used if the type is an enum, and unary plus + // compiles and turns it into an integral type. + template ::value && + std::is_integral())>::value>::type* = + nullptr> + static int Test(int); + + // Otherwise, this overload is used. + template + static char Test(...); + + public: + static constexpr bool value = + std::is_same::type>(0)), + int>::value; +}; + +} // namespace type_traits_impl + +// Determines if the given type is integral, or an enum that +// converts implicitly to an integral type. +template +struct IsIntlike { + private: + using X = typename std::remove_reference::type; + + public: + static constexpr bool value = + std::is_integral::value || type_traits_impl::IsIntEnum::value; +}; + +namespace test_enum_intlike { + +enum E1 { e1 }; +enum { e2 }; +enum class E3 { e3 }; +struct S {}; + +static_assert(type_traits_impl::IsIntEnum::value, ""); +static_assert(type_traits_impl::IsIntEnum::value, ""); +static_assert(!type_traits_impl::IsIntEnum::value, ""); +static_assert(!type_traits_impl::IsIntEnum::value, ""); +static_assert(!type_traits_impl::IsIntEnum::value, ""); +static_assert(!type_traits_impl::IsIntEnum::value, ""); + +static_assert(IsIntlike::value, ""); +static_assert(IsIntlike::value, ""); +static_assert(!IsIntlike::value, ""); +static_assert(IsIntlike::value, ""); +static_assert(!IsIntlike::value, ""); +static_assert(!IsIntlike::value, ""); + +} // namespace test_enum_intlike + +} // namespace rtc + +#endif // RTC_BASE_TYPE_TRAITS_H_ diff --git a/src/qos/congestion_control_feedback.cc b/src/qos/congestion_control_feedback.cc new file mode 100644 index 0000000..1a05576 --- /dev/null +++ b/src/qos/congestion_control_feedback.cc @@ -0,0 +1,302 @@ +#include "congestion_control_feedback.h" + +#include +#include +#include +#include +#include + +#include "array_view.h" +#include "byte_io.h" +#include "log.h" + +/* + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P| FMT=11 | PT = 205 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of RTCP packet sender | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of 1st RTP Stream | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | begin_seq | num_reports | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |R|ECN| Arrival time offset | ... . + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + . . + . . + . . + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of nth RTP Stream | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | begin_seq | num_reports | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |R|ECN| Arrival time offset | ... | + . . + . . + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Report Timestamp (32 bits) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ + +namespace { + +constexpr size_t kSenderSsrcLength = 4; +constexpr size_t kHeaderPerMediaSssrcLength = 8; +constexpr size_t kTimestampLength = 4; + +// RFC-3168, Section 5 +constexpr uint16_t kEcnEct1 = 0x01; +constexpr uint16_t kEcnEct0 = 0x02; +constexpr uint16_t kEcnCe = 0x03; + +// Arrival time offset (ATO, 13 bits): +// The arrival time of the RTP packet at the receiver, as an offset before the +// time represented by the Report Timestamp (RTS) field of this RTCP congestion +// control feedback report. The ATO field is in units of 1/1024 seconds (this +// unit is chosen to give exact offsets from the RTS field) so, for example, an +// ATO value of 512 indicates that the corresponding RTP packet arrived exactly +// half a second before the time instant represented by the RTS field. If the +// measured value is greater than 8189/1024 seconds (the value that would be +// coded as 0x1FFD), the value 0x1FFE MUST be reported to indicate an over-range +// measurement. If the measurement is unavailable or if the arrival time of the +// RTP packet is after the time represented by the RTS field, then an ATO value +// of 0x1FFF MUST be reported for the packet. +uint16_t To13bitAto(int64_t arrival_time_offset) { + if (arrival_time_offset < 0) { + return 0x1FFF; + } + return std::min(static_cast(1024 * (arrival_time_offset / 1000)), + int64_t{0x1FFE}); +} + +int64_t AtoToTimeDelta(uint16_t receive_info) { + // receive_info + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // |R|ECN| Arrival time offset | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + // ato -> second + const uint16_t ato = receive_info & 0x1FFF; + if (ato == 0x1FFE) { + return std::numeric_limits::max(); + } + if (ato == 0x1FFF) { + return std::numeric_limits::min(); + } + return ato / 1024; +} + +uint16_t To2BitEcn(EcnMarking ecn_marking) { + switch (ecn_marking) { + case EcnMarking::kNotEct: + return 0; + case EcnMarking::kEct1: + return kEcnEct1 << 13; + case EcnMarking::kEct0: + return kEcnEct0 << 13; + case EcnMarking::kCe: + return kEcnCe << 13; + } +} + +EcnMarking ToEcnMarking(uint16_t receive_info) { + const uint16_t ecn = (receive_info >> 13) & 0b11; + if (ecn == kEcnEct1) { + return EcnMarking::kEct1; + } + if (ecn == kEcnEct0) { + return EcnMarking::kEct0; + } + if (ecn == kEcnCe) { + return EcnMarking::kCe; + } + return EcnMarking::kNotEct; +} + +} // namespace + +CongestionControlFeedback ::CongestionControlFeedback( + std::vector packets, uint32_t compact_ntp_timestamp) + : packets_(std::move(packets)), + report_timestamp_compact_ntp_(compact_ntp_timestamp) {} + +bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position, + size_t max_length, + PacketReadyCallback callback) const { + // Ensure there is enough room for this packet. + while (*position + BlockLength() > max_length) { + if (!OnBufferFull(buffer, position, callback)) return false; + } + const size_t position_end = *position + BlockLength(); + + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // |V=2|P| FMT=11 | PT = 205 | length | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | SSRC of RTCP packet sender | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + CreateHeader(kFeedbackMessageType, kPacketType, HeaderLength(), buffer, + position); + ByteWriter::WriteBigEndian(&buffer[*position], sender_ssrc()); + *position += 4; + + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | SSRC of nth RTP Stream | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | begin_seq | num_reports | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // |R|ECN| Arrival time offset | ... . + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // . . + auto write_report_for_ssrc = [&](ArrayView packets) { + // SSRC of nth RTP stream. + ByteWriter::WriteBigEndian(&buffer[*position], packets[0].ssrc); + *position += 4; + + // begin_seq + ByteWriter::WriteBigEndian(&buffer[*position], + packets[0].sequence_number); + *position += 2; + // num_reports + uint16_t num_reports = packets.size(); + RTC_DCHECK_EQ(static_cast( + + packets[packets.size() - 1].sequence_number - + packets[0].sequence_number + 1), + packets.size()) + << "Expected continous rtp sequence numbers."; + + // Each report block MUST NOT include more than 16384 packet metric + // blocks (i.e., it MUST NOT report on more than one quarter of the + // sequence number space in a single report). + if (num_reports > 16384) { + LOG_FATAL("Unexpected number of reports: {}", num_reports); + return; + } + ByteWriter::WriteBigEndian(&buffer[*position], num_reports); + *position += 2; + + for (const PacketInfo& packet : packets) { + bool received = packet.arrival_time_offset.IsFinite(); + uint16_t packet_info = 0; + if (received) { + packet_info = 0x8000 | To2BitEcn(packet.ecn) | + To13bitAto(packet.arrival_time_offset); + } + ByteWriter::WriteBigEndian(&buffer[*position], packet_info); + *position += 2; + } + // 32bit align per SSRC block. + if (num_reports % 2 != 0) { + ByteWriter::WriteBigEndian(&buffer[*position], 0); + *position += 2; + } + }; + + ArrayView remaining(packets_); + while (!remaining.empty()) { + int number_of_packets_for_ssrc = 0; + uint32_t ssrc = remaining[0].ssrc; + for (const PacketInfo& packet_info : remaining) { + if (packet_info.ssrc != ssrc) { + break; + } + ++number_of_packets_for_ssrc; + } + write_report_for_ssrc(remaining.subview(0, number_of_packets_for_ssrc)); + remaining = remaining.subview(number_of_packets_for_ssrc); + } + + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | Report Timestamp (32 bits) | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + ByteWriter::WriteBigEndian(&buffer[*position], + report_timestamp_compact_ntp_); + *position += 4; + + RTC_DCHECK_EQ(*position, position_end); + return true; +} + +size_t CongestionControlFeedback::BlockLength() const { + // Total size of this packet + size_t total_size = kSenderSsrcLength + kHeaderLength + kTimestampLength; + if (packets_.empty()) { + return total_size; + } + + auto increase_size_per_ssrc = [](int number_of_packets_for_ssrc) { + // Each packet report needs two bytes. + size_t packet_block_size = number_of_packets_for_ssrc * 2; + // 32 bit aligned. + return kHeaderPerMediaSssrcLength + packet_block_size + + ((number_of_packets_for_ssrc % 2) != 0 ? 2 : 0); + }; + + uint32_t ssrc = packets_.front().ssrc; + uint16_t first_sequence_number = packets_.front().sequence_number; + for (size_t i = 0; i < packets_.size(); ++i) { + if (packets_[i].ssrc != ssrc) { + uint16_t number_of_packets = + packets_[i - 1].sequence_number - first_sequence_number + 1; + total_size += increase_size_per_ssrc(number_of_packets); + ssrc = packets_[i].ssrc; + first_sequence_number = packets_[i].sequence_number; + } + } + uint16_t number_of_packets = + packets_.back().sequence_number - first_sequence_number + 1; + total_size += increase_size_per_ssrc(number_of_packets); + + return total_size; +} + +bool CongestionControlFeedback::Parse(const rtcp::CommonHeader& packet) { + const uint8_t* payload = packet.payload(); + const uint8_t* payload_end = packet.payload() + packet.payload_size_bytes(); + + if (packet.payload_size_bytes() % 4 != 0 || + packet.payload_size_bytes() < kSenderSsrcLength + kTimestampLength) { + return false; + } + + SetSenderSsrc(ByteReader::ReadBigEndian(payload)); + payload += 4; + + report_timestamp_compact_ntp_ = + ByteReader::ReadBigEndian(payload_end - 4); + payload_end -= 4; + + while (payload + kHeaderPerMediaSssrcLength < payload_end) { + uint32_t ssrc = ByteReader::ReadBigEndian(payload); + payload += 4; + + uint16_t base_seqno = ByteReader::ReadBigEndian(payload); + payload += 2; + uint16_t num_reports = ByteReader::ReadBigEndian(payload); + payload += 2; + + constexpr size_t kPerPacketLength = 2; + if (payload + kPerPacketLength * num_reports > payload_end) { + return false; + } + + for (int i = 0; i < num_reports; ++i) { + uint16_t packet_info = ByteReader::ReadBigEndian(payload); + payload += 2; + + uint16_t seq_no = base_seqno + i; + bool received = (packet_info & 0x8000); + packets_.push_back( + {.ssrc = ssrc, + .sequence_number = seq_no, + .arrival_time_offset = received ? AtoToTimeDelta(packet_info) + : TimeDelta::MinusInfinity(), + .ecn = ToEcnMarking(packet_info)}); + } + if (num_reports % 2) { + // 2 bytes padding + payload += 2; + } + } + return payload == payload_end; +} \ No newline at end of file diff --git a/src/qos/congestion_control_feedback.h b/src/qos/congestion_control_feedback.h new file mode 100644 index 0000000..7a99ecd --- /dev/null +++ b/src/qos/congestion_control_feedback.h @@ -0,0 +1,78 @@ +/* + * @Author: DI JUNKUN + * @Date: 2024-12-18 + * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _CONGESTION_CONTROL_FEEDBACK_H_ +#define _CONGESTION_CONTROL_FEEDBACK_H_ + +#include +#include +#include +#include + +#include "rtp_feedback.h" + +// L4S Explicit Congestion Notification (ECN) . +// https://www.rfc-editor.org/rfc/rfc9331.html ECT stands for ECN-Capable +// Transport and CE stands for Congestion Experienced. + +// RFC-3168, Section 5 +// +-----+-----+ +// | ECN FIELD | +// +-----+-----+ +// ECT CE [Obsolete] RFC 2481 names for the ECN bits. +// 0 0 Not-ECT +// 0 1 ECT(1) +// 1 0 ECT(0) +// 1 1 CE + +enum class EcnMarking { + kNotEct = 0, // Not ECN-Capable Transport + kEct1 = 1, // ECN-Capable Transport + kEct0 = 2, // Not used by L4s (or webrtc.) + kCe = 3, // Congestion experienced +}; + +// Congestion control feedback message as specified in +// https://www.rfc-editor.org/rfc/rfc8888.html +class CongestionControlFeedback : public RtpFeedback { + public: + struct PacketInfo { + uint32_t ssrc = 0; + uint16_t sequence_number = 0; + // Time offset from report timestamp. Minus infinity if the packet has not + // been received. + int64_t arrival_time_offset = std::numeric_limits::min(); + rtc::EcnMarking ecn = rtc::EcnMarking::kNotEct; + }; + + static constexpr uint8_t kFeedbackMessageType = 11; + + // `Packets` MUST be sorted in sequence_number order per SSRC. There MUST not + // be missing sequence numbers between `Packets`. `Packets` MUST not include + // duplicate sequence numbers. + CongestionControlFeedback(std::vector packets, + uint32_t report_timestamp_compact_ntp); + CongestionControlFeedback() = default; + + bool Parse(const CommonHeader& packet); + + rtc::ArrayView packets() const { return packets_; } + + uint32_t report_timestamp_compact_ntp() const { + return report_timestamp_compact_ntp_; + } + + // Serialize the packet. + bool Create(uint8_t* packet, size_t* position, size_t max_length, + PacketReadyCallback callback) const override; + size_t BlockLength() const override; + + private: + std::vector packets_; + uint32_t report_timestamp_compact_ntp_ = 0; +}; + +#endif \ No newline at end of file diff --git a/src/qos/congestion_control_feedback_generator.cc b/src/qos/congestion_control_feedback_generator.cc new file mode 100644 index 0000000..7781776 --- /dev/null +++ b/src/qos/congestion_control_feedback_generator.cc @@ -0,0 +1,108 @@ +#include "congestion_control_feedback_generator.h" + +#include +#include +#include +#include +#include +#include + +uint32_t ConvertToCompactNtp(int64_t now_ms) { + int64_t seconds = now_ms / 1000; + int64_t milliseconds = now_ms % 1000; + uint16_t ntp_seconds = static_cast(seconds & 0xFFFF); + uint16_t ntp_fraction = static_cast((milliseconds * 65536) / 1000); + uint32_t compact_ntp = (ntp_seconds << 16) | ntp_fraction; + return compact_ntp; +} + +CongestionControlFeedbackGenerator::CongestionControlFeedbackGenerator( + RtcpSender rtcp_sender) + : rtcp_sender_(std::move(rtcp_sender)) {} + +void CongestionControlFeedbackGenerator::OnReceivedPacket( + const RtpPacketReceived& packet) { + marker_bit_seen_ |= packet.Marker(); + if (!first_arrival_time_since_feedback_) { + first_arrival_time_since_feedback_ = packet.arrival_time(); + } + feedback_trackers_[packet.Ssrc()].ReceivedPacket(packet); + if (NextFeedbackTime() < packet.arrival_time()) { + auto now = std::chrono::system_clock::now(); + auto now_ms = std::chrono::duration_cast( + now.time_since_epoch()) + .count(); + SendFeedback(now_ms); + } +} + +int64_t CongestionControlFeedbackGenerator::NextFeedbackTime() const { + auto now = std::chrono::system_clock::now(); + auto now_ms = std::chrono::duration_cast( + now.time_since_epoch()) + .count(); + + if (!first_arrival_time_since_feedback_) { + return std::max(now_ms + min_time_between_feedback_, + next_possible_feedback_send_time_); + } + + if (!marker_bit_seen_) { + return std::max(next_possible_feedback_send_time_, + *first_arrival_time_since_feedback_ + + max_time_to_wait_for_packet_with_marker_); + } + return next_possible_feedback_send_time_; +} + +int64_t CongestionControlFeedbackGenerator::Process(int64_t now_ms) { + if (NextFeedbackTime() <= now_ms) { + SendFeedback(now_ms); + } + return NextFeedbackTime() - now_ms; +} + +void CongestionControlFeedbackGenerator::OnSendBandwidthEstimateChanged( + DataRate estimate) { + // Feedback reports should max occupy 5% of total bandwidth. + max_feedback_rate_ = estimate * 0.05; +} + +void CongestionControlFeedbackGenerator::SetTransportOverhead( + DataSize overhead_per_packet) { + packet_overhead_ = overhead_per_packet; +} + +void CongestionControlFeedbackGenerator::SendFeedback(int64_t now_ms) { + uint32_t compact_ntp = ConvertToCompactNtp(now_ms); + std::vector rtcp_packet_info; + for (auto& [unused, tracker] : feedback_trackers_) { + tracker.AddPacketsToFeedback(now_ms, rtcp_packet_info); + } + + marker_bit_seen_ = false; + first_arrival_time_since_feedback_ = std::nullopt; + + auto feedback = std::make_unique( + std::move(rtcp_packet_info), compact_ntp); + CalculateNextPossibleSendTime(feedback->BlockLength(), now_ms); + + std::vector> rtcp_packets; + rtcp_packets.push_back(std::move(feedback)); + rtcp_sender_(std::move(rtcp_packets)); +} + +void CongestionControlFeedbackGenerator::CalculateNextPossibleSendTime( + int64_t feedback_size, int64_t now_ms) { + int64_t time_since_last_sent = now - last_feedback_sent_time_; + size_t debt_payed = time_since_last_sent * max_feedback_rate_; + send_rate_debt_ = + debt_payed > send_rate_debt_ ? 0 : send_rate_debt_ - debt_payed; + send_rate_debt_ += feedback_size + packet_overhead_; + last_feedback_sent_time_ = now_ms; + next_possible_feedback_send_time_ = + now_ms + + std::clamp(max_feedback_rate_ == 0 ? std::numeric_limits::max() + : send_rate_debt_ / max_feedback_rate_, + min_time_between_feedback_, max_time_between_feedback_); +} diff --git a/src/qos/congestion_control_feedback_generator.h b/src/qos/congestion_control_feedback_generator.h new file mode 100644 index 0000000..1e3e650 --- /dev/null +++ b/src/qos/congestion_control_feedback_generator.h @@ -0,0 +1,59 @@ +/* + * @Author: DI JUNKUN + * @Date: 2024-12-18 + * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _CONGESTION_CONTROL_FEEDBACK_GENERATOR_H_ +#define _CONGESTION_CONTROL_FEEDBACK_GENERATOR_H_ + +#include "rtp_packet_received.h" + +class CongestionControlFeedbackGenerator + : public RtpTransportFeedbackGenerator { + public: + CongestionControlFeedbackGenerator( + RtpTransportFeedbackGenerator::RtcpSender feedback_sender); + ~CongestionControlFeedbackGenerator() = default; + + void OnReceivedPacket(const RtpPacketReceived& packet) override; + + void OnSendBandwidthEstimateChanged(DataRate estimate) override; + + int64_t Process(int64_t now_ms) override; + + void SetTransportOverhead(DataSize overhead_per_packet) override; + + private: + int64_t NextFeedbackTime() const; + + void SendFeedback(int64_t now_ms); + + void CalculateNextPossibleSendTime(int64_t feedback_size, int64_t now_ms); + + private: + // Feedback should not use more than 5% of the configured send bandwidth + // estimate. Min and max duration between feedback is configurable using field + // trials, but per default, min is 25ms and max is 250ms. + // If possible, given the other constraints, feedback will be sent when a + // packet with marker bit is received in order to provide feedback as soon as + // possible after receiving a complete video frame. If no packet with marker + // bit is received, feedback can be delayed up to 25ms after the first packet + // since the last sent feedback. On good networks, this means that a sender + // may receive feedback for every sent frame. + int64_t min_time_between_feedback_ = 25; + int64_t max_time_between_feedback_ = 250; + int64_t max_time_to_wait_for_packet_with_marker_ = 25; + + int64_t max_feedback_rate_ = 1000; // kbps + int64_t packet_overhead_ = 0; + int64_t send_rate_debt_ = 0; + + std::optional first_arrival_time_since_feedback_; + int64_t next_possible_feedback_send_time_ = 0; + int64_t last_feedback_sent_time_ = 0; + + bool marker_bit_seen_ = false; +}; + +#endif \ No newline at end of file diff --git a/src/qos/congestion_control_feedback_tracker.cc b/src/qos/congestion_control_feedback_tracker.cc new file mode 100644 index 0000000..86a3573 --- /dev/null +++ b/src/qos/congestion_control_feedback_tracker.cc @@ -0,0 +1,87 @@ +#include "congestion_control_feedback_tracker.h" + +#include +#include +#include + +void CongestionControlFeedbackTracker::ReceivedPacket( + const RtpPacketReceived& packet) { + int64_t unwrapped_sequence_number = + unwrapper_.Unwrap(packet.SequenceNumber()); + if (last_sequence_number_in_feedback_ && + unwrapped_sequence_number < *last_sequence_number_in_feedback_ + 1) { + RTC_LOG(LS_WARNING) + << "Received packet unorderered between feeedback. SSRC: " + << packet.Ssrc() << " Seq: " << packet.SequenceNumber() + << " last feedback: " + << static_cast(*last_sequence_number_in_feedback_); + // TODO: bugs.webrtc.org/374550342 - According to spec, the old packets + // should be reported again. But at the moment, we dont store history of + // packet we already reported and thus, they will be reported as lost. Note + // that this is likely not a problem in webrtc since the packets will also + // be removed from the send history when they are first reported as + // received. + last_sequence_number_in_feedback_ = unwrapped_sequence_number - 1; + } + packets_.push_back({.ssrc = packet.Ssrc(), + .unwrapped_sequence_number = unwrapped_sequence_number, + .arrival_time = packet.arrival_time(), + .ecn = packet.ecn()}); +} + +void CongestionControlFeedbackTracker::AddPacketsToFeedback( + int64_t feedback_time, + std::vector& packet_feedback) { + if (packets_.empty()) { + return; + } + absl::c_sort(packets_, [](const PacketInfo& a, const PacketInfo& b) { + return std::tie(a.unwrapped_sequence_number, a.arrival_time) < + std::tie(b.unwrapped_sequence_number, b.arrival_time); + }); + if (!last_sequence_number_in_feedback_) { + last_sequence_number_in_feedback_ = + packets_.front().unwrapped_sequence_number - 1; + } + + auto packet_it = packets_.begin(); + uint32_t ssrc = packet_it->ssrc; + for (int64_t sequence_number = *last_sequence_number_in_feedback_ + 1; + sequence_number <= packets_.back().unwrapped_sequence_number; + ++sequence_number) { + RTC_DCHECK(packet_it != packets_.end()); + RTC_DCHECK_EQ(ssrc, packet_it->ssrc); + + rtc::EcnMarking ecn = rtc::EcnMarking::kNotEct; + TimeDelta arrival_time_offset = TimeDelta::MinusInfinity(); + + if (sequence_number == packet_it->unwrapped_sequence_number) { + arrival_time_offset = feedback_time - packet_it->arrival_time; + ecn = packet_it->ecn; + ++packet_it; + while (packet_it != packets_.end() && + packet_it->unwrapped_sequence_number == sequence_number) { + // According to RFC 8888: + // If duplicate copies of a particular RTP packet are received, then the + // arrival time of the first copy to arrive MUST be reported. If any of + // the copies of the duplicated packet are ECN-CE marked, then an ECN-CE + // mark MUST be reported for that packet; otherwise, the ECN mark of the + // first copy to arrive is reported. + if (packet_it->ecn == rtc::EcnMarking::kCe) { + ecn = rtc::EcnMarking::kCe; + } + RTC_LOG(LS_WARNING) << "Received duplicate packet ssrc:" << ssrc + << " seq:" << static_cast(sequence_number) + << " ecn: " << static_cast(ecn); + ++packet_it; + } + } // else - the packet has not been received yet. + packet_feedback.push_back( + {.ssrc = ssrc, + .sequence_number = static_cast(sequence_number), + .arrival_time_offset = arrival_time_offset, + .ecn = ecn}); + } + last_sequence_number_in_feedback_ = packets_.back().unwrapped_sequence_number; + packets_.clear(); +} diff --git a/src/qos/congestion_control_feedback_tracker.h b/src/qos/congestion_control_feedback_tracker.h new file mode 100644 index 0000000..4ab81f4 --- /dev/null +++ b/src/qos/congestion_control_feedback_tracker.h @@ -0,0 +1,43 @@ +/* + * @Author: DI JUNKUN + * @Date: 2024-12-18 + * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _CONGESTION_CONTROL_FEEDBACK_TRACKER_H_ +#define _CONGESTION_CONTROL_FEEDBACK_TRACKER_H_ + +#include +#include + +#include "rtp_packet_received.h" + +class CongestionControlFeedbackTracker { + public: + CongestionControlFeedbackTracker() = default; + + void ReceivedPacket(const RtpPacketReceived& packet); + + // Adds received packets to `packet_feedback` + // RTP sequence numbers are continous from the last created feedback unless + // reordering has occured between feedback packets. If so, the sequence + // number range may overlap with previousely sent feedback. + void AddPacketsToFeedback( + int64_t feedback_time, + std::vector& packet_feedback); + + private: + struct PacketInfo { + uint32_t ssrc; + int64_t unwrapped_sequence_number = 0; + int64_t arrival_time; + rtc::EcnMarking ecn = rtc::EcnMarking::kNotEct; + }; + + std::optional last_sequence_number_in_feedback_; + SeqNumUnwrapper unwrapper_; + + std::vector packets_; +}; + +#endif \ No newline at end of file diff --git a/src/qos/receive_side_congestion_controller.cc b/src/qos/receive_side_congestion_controller.cc new file mode 100644 index 0000000..a8afd8a --- /dev/null +++ b/src/qos/receive_side_congestion_controller.cc @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "receive_side_congestion_controller.h" + +#include +#include +#include + +namespace { +static const uint32_t kTimeOffsetSwitchThreshold = 30; +} // namespace + +void ReceiveSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms, + int64_t max_rtt_ms) { + MutexLock lock(&mutex_); + rbe_->OnRttUpdate(avg_rtt_ms, max_rtt_ms); +} + +void ReceiveSideCongestionController::RemoveStream(uint32_t ssrc) { + MutexLock lock(&mutex_); + rbe_->RemoveStream(ssrc); +} + +DataRate ReceiveSideCongestionController::LatestReceiveSideEstimate() const { + MutexLock lock(&mutex_); + return rbe_->LatestEstimate(); +} + +void ReceiveSideCongestionController::PickEstimator( + bool has_absolute_send_time) { + if (has_absolute_send_time) { + // If we see AST in header, switch RBE strategy immediately. + if (!using_absolute_send_time_) { + RTC_LOG(LS_INFO) + << "WrappingBitrateEstimator: Switching to absolute send time RBE."; + using_absolute_send_time_ = true; + rbe_ = std::make_unique( + env_, &remb_throttler_); + } + packets_since_absolute_send_time_ = 0; + } else { + // When we don't see AST, wait for a few packets before going back to TOF. + if (using_absolute_send_time_) { + ++packets_since_absolute_send_time_; + if (packets_since_absolute_send_time_ >= kTimeOffsetSwitchThreshold) { + RTC_LOG(LS_INFO) + << "WrappingBitrateEstimator: Switching to transmission " + "time offset RBE."; + using_absolute_send_time_ = false; + rbe_ = std::make_unique( + env_, &remb_throttler_); + } + } + } +} + +ReceiveSideCongestionController::ReceiveSideCongestionController( + const Environment& env, + TransportSequenceNumberFeedbackGenenerator::RtcpSender feedback_sender, + RembThrottler::RembSender remb_sender, + absl::Nullable network_state_estimator) + : env_(env), + remb_throttler_(std::move(remb_sender), &env_.clock()), + transport_sequence_number_feedback_generator_(feedback_sender, + network_state_estimator), + congestion_control_feedback_generator_(env, feedback_sender), + rbe_(std::make_unique( + env_, &remb_throttler_)), + using_absolute_send_time_(false), + packets_since_absolute_send_time_(0) { + FieldTrialParameter force_send_rfc8888_feedback("force_send", false); + ParseFieldTrial( + {&force_send_rfc8888_feedback}, + env.field_trials().Lookup("WebRTC-RFC8888CongestionControlFeedback")); + if (force_send_rfc8888_feedback) { + EnablSendCongestionControlFeedbackAccordingToRfc8888(); + } +} + +void ReceiveSideCongestionController:: + EnablSendCongestionControlFeedbackAccordingToRfc8888() { + RTC_DCHECK_RUN_ON(&sequence_checker_); + send_rfc8888_congestion_feedback_ = true; +} + +void ReceiveSideCongestionController::OnReceivedPacket( + const RtpPacketReceived& packet, MediaType media_type) { + if (send_rfc8888_congestion_feedback_) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + congestion_control_feedback_generator_.OnReceivedPacket(packet); + return; + } + bool has_transport_sequence_number = + packet.HasExtension() || + packet.HasExtension(); + if (media_type == MediaType::AUDIO && !has_transport_sequence_number) { + // For audio, we only support send side BWE. + return; + } + + if (has_transport_sequence_number) { + // Send-side BWE. + transport_sequence_number_feedback_generator_.OnReceivedPacket(packet); + } else { + // Receive-side BWE. + MutexLock lock(&mutex_); + PickEstimator(packet.HasExtension()); + rbe_->IncomingPacket(packet); + } +} + +void ReceiveSideCongestionController::OnBitrateChanged(int bitrate_bps) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + DataRate send_bandwidth_estimate = DataRate::BitsPerSec(bitrate_bps); + transport_sequence_number_feedback_generator_.OnSendBandwidthEstimateChanged( + send_bandwidth_estimate); + congestion_control_feedback_generator_.OnSendBandwidthEstimateChanged( + send_bandwidth_estimate); +} + +TimeDelta ReceiveSideCongestionController::MaybeProcess() { + Timestamp now = env_.clock().CurrentTime(); + if (send_rfc8888_congestion_feedback_) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + return congestion_control_feedback_generator_.Process(now); + } + mutex_.Lock(); + TimeDelta time_until_rbe = rbe_->Process(); + mutex_.Unlock(); + TimeDelta time_until_rep = + transport_sequence_number_feedback_generator_.Process(now); + TimeDelta time_until = std::min(time_until_rbe, time_until_rep); + return std::max(time_until, TimeDelta::Zero()); +} + +void ReceiveSideCongestionController::SetMaxDesiredReceiveBitrate( + DataRate bitrate) { + remb_throttler_.SetMaxDesiredReceiveBitrate(bitrate); +} + +void ReceiveSideCongestionController::SetTransportOverhead( + DataSize overhead_per_packet) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + transport_sequence_number_feedback_generator_.SetTransportOverhead( + overhead_per_packet); + congestion_control_feedback_generator_.SetTransportOverhead( + overhead_per_packet); +} diff --git a/src/qos/receive_side_congestion_controller.h b/src/qos/receive_side_congestion_controller.h new file mode 100644 index 0000000..2b978ed --- /dev/null +++ b/src/qos/receive_side_congestion_controller.h @@ -0,0 +1,69 @@ +/* + * @Author: DI JUNKUN + * @Date: 2024-12-12 + * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _RECEIVE_SIDE_CONGESTION_CONTROLLER_H_ +#define _RECEIVE_SIDE_CONGESTION_CONTROLLER_H_ + +class ReceiveSideCongestionController { + public: + ReceiveSideCongestionController(); + ~ReceiveSideCongestionController() override = default; + + public: + void EnablSendCongestionControlFeedbackAccordingToRfc8888(); + + void OnReceivedPacket(const RtpPacketReceived& packet, MediaType media_type); + + // Implements CallStatsObserver. + void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override; + + // This is send bitrate, used to control the rate of feedback messages. + void OnBitrateChanged(int bitrate_bps); + + // Ensures the remote party is notified of the receive bitrate no larger than + // `bitrate` using RTCP REMB. + void SetMaxDesiredReceiveBitrate(DataRate bitrate); + + void SetTransportOverhead(DataSize overhead_per_packet); + + // Returns latest receive side bandwidth estimation. + // Returns zero if receive side bandwidth estimation is unavailable. + DataRate LatestReceiveSideEstimate() const; + + // Removes stream from receive side bandwidth estimation. + // Noop if receive side bwe is not used or stream doesn't participate in it. + void RemoveStream(uint32_t ssrc); + + // Runs periodic tasks if it is time to run them, returns time until next + // call to `MaybeProcess` should be non idle. + TimeDelta MaybeProcess(); + + private: + void PickEstimator(bool has_absolute_send_time) + RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + const Environment env_; + RembThrottler remb_throttler_; + + // TODO: bugs.webrtc.org/42224904 - Use sequence checker for all usage of + // ReceiveSideCongestionController. At the time of + // writing OnReceivedPacket and MaybeProcess can unfortunately be called on an + // arbitrary thread by external projects. + SequenceChecker sequence_checker_; + + bool send_rfc8888_congestion_feedback_ = false; + TransportSequenceNumberFeedbackGenenerator + transport_sequence_number_feedback_generator_; + CongestionControlFeedbackGenerator congestion_control_feedback_generator_ + RTC_GUARDED_BY(sequence_checker_); + + mutable Mutex mutex_; + std::unique_ptr rbe_ RTC_GUARDED_BY(mutex_); + bool using_absolute_send_time_ RTC_GUARDED_BY(mutex_); + uint32_t packets_since_absolute_send_time_ RTC_GUARDED_BY(mutex_); +}; + +#endif \ No newline at end of file diff --git a/src/rtcp/rtcp_packet.cpp b/src/rtcp/rtcp_packet/rtcp_packet.cpp similarity index 100% rename from src/rtcp/rtcp_packet.cpp rename to src/rtcp/rtcp_packet/rtcp_packet.cpp diff --git a/src/rtcp/rtcp_packet.h b/src/rtcp/rtcp_packet/rtcp_packet.h similarity index 100% rename from src/rtcp/rtcp_packet.h rename to src/rtcp/rtcp_packet/rtcp_packet.h diff --git a/src/rtcp/rtp_feedback/rtp_feedback.cpp b/src/rtcp/rtp_feedback/rtp_feedback.cpp new file mode 100644 index 0000000..4115b1b --- /dev/null +++ b/src/rtcp/rtp_feedback/rtp_feedback.cpp @@ -0,0 +1,29 @@ +#include "rtp_feedback.h" + +#include "byte_io.h" + +// RFC 4585, Section 6.1: Feedback format. +// +// Common packet format: +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |V=2|P| FMT | PT | length | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// 0 | SSRC of packet sender | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// 4 | SSRC of media source | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// : Feedback Control Information (FCI) : +// : : + +void RtpFeedback::ParseCommonFeedback(const uint8_t* payload) { + SetSenderSsrc(ByteReader::ReadBigEndian(&payload[0])); + SetMediaSsrc(ByteReader::ReadBigEndian(&payload[4])); +} + +void RtpFeedback::CreateCommonFeedback(uint8_t* payload) const { + ByteWriter::WriteBigEndian(&payload[0], sender_ssrc()); + ByteWriter::WriteBigEndian(&payload[4], media_ssrc()); +} \ No newline at end of file diff --git a/src/rtcp/rtp_feedback/rtp_feedback.h b/src/rtcp/rtp_feedback/rtp_feedback.h new file mode 100644 index 0000000..3825220 --- /dev/null +++ b/src/rtcp/rtp_feedback/rtp_feedback.h @@ -0,0 +1,37 @@ +/* + * @Author: DI JUNKUN + * @Date: 2024-12-18 + * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _RTP_FEEDBACK_H_ +#define _RTP_FEEDBACK_H_ + +#include +#include + +#include "rtcp_packet.h" + +// RTPFB: Transport layer feedback message. +// RFC4585, Section 6.2 +class RtpFeedback : public RtcpPacket { + public: + static constexpr uint8_t kPacketType = 205; + + RtpFeedback() = default; + ~RtpFeedback() override = default; + + void SetMediaSsrc(uint32_t ssrc) { media_ssrc_ = ssrc; } + + uint32_t media_ssrc() const { return media_ssrc_; } + + protected: + static constexpr size_t kCommonFeedbackLength = 8; + void ParseCommonFeedback(const uint8_t* payload); + void CreateCommonFeedback(uint8_t* payload) const; + + private: + uint32_t media_ssrc_ = 0; +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_packet/rtp_packet_received.cpp b/src/rtp/rtp_packet/rtp_packet_received.cpp new file mode 100644 index 0000000..9101460 --- /dev/null +++ b/src/rtp/rtp_packet/rtp_packet_received.cpp @@ -0,0 +1,14 @@ +#include "rtp_packet_received.h" + +RtpPacketReceived::RtpPacketReceived() = default; +RtpPacketReceived::RtpPacketReceived(int64_t arrival_time) + : arrival_time_(arrival_time) {} +RtpPacketReceived::RtpPacketReceived(const RtpPacketReceived& packet) = default; +RtpPacketReceived::RtpPacketReceived(RtpPacketReceived&& packet) = default; + +RtpPacketReceived& RtpPacketReceived::operator=( + const RtpPacketReceived& packet) = default; +RtpPacketReceived& RtpPacketReceived::operator=(RtpPacketReceived&& packet) = + default; + +RtpPacketReceived::~RtpPacketReceived() {} diff --git a/src/rtp/rtp_packet/rtp_packet_received.h b/src/rtp/rtp_packet/rtp_packet_received.h new file mode 100644 index 0000000..926251a --- /dev/null +++ b/src/rtp/rtp_packet/rtp_packet_received.h @@ -0,0 +1,31 @@ +/* + * @Author: DI JUNKUN + * @Date: 2024-12-18 + * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _RTP_PACKET_RECEIVED_H_ +#define _RTP_PACKET_RECEIVED_H_ + +#include + +#include "rtp_packet.h" + +class RtpPacketReceived : public RtpPacket { + public: + RtpPacketReceived(); + explicit RtpPacketReceived( + int64_t arrival_time = std::numeric_limits::min()); + RtpPacketReceived(const RtpPacketReceived& packet); + RtpPacketReceived(RtpPacketReceived&& packet); + + RtpPacketReceived& operator=(const RtpPacketReceived& packet); + RtpPacketReceived& operator=(RtpPacketReceived&& packet); + + ~RtpPacketReceived(); + + private: + int64_t arrival_time_ = std::numeric_limits::min(); +}; + +#endif \ No newline at end of file diff --git a/xmake.lua b/xmake.lua index b266e93..edb7ab0 100644 --- a/xmake.lua +++ b/xmake.lua @@ -79,8 +79,12 @@ target("statistics") target("rtcp") set_kind("object") add_deps("log", "common") - add_files("src/rtcp/*.cpp") - add_includedirs("src/rtcp", {public = true}) + add_files("src/rtcp/*.cpp", + "src/rtcp/rtcp_packet/*.cpp", + "src/rtcp/rtp_feedback/*.cpp") + add_includedirs("src/rtcp", + "src/rtcp/rtcp_packet", + "src/rtcp/rtp_feedback", {public = true}) target("rtp") set_kind("object")