From 1db57bfc76be6d843a81dd34a7390e8bb30c13cb Mon Sep 17 00:00:00 2001 From: dijunkun Date: Wed, 12 Feb 2025 17:35:59 +0800 Subject: [PATCH] [feat] add nack module --- .../rtp_channel/rtp_video_receiver.cpp | 10 +- src/channel/rtp_channel/rtp_video_receiver.h | 65 ++++++- src/qos/histogram.cc | 48 +++++ src/qos/histogram.h | 44 +++++ src/qos/nack_requester.cc | 182 ++++++++++++++++++ src/qos/nack_requester.h | 73 +++++++ ...emote_bitrate_estimator_single_stream.ccxx | 2 +- src/transport/ice_transport_controller.h | 2 +- 8 files changed, 421 insertions(+), 5 deletions(-) create mode 100644 src/qos/histogram.cc create mode 100644 src/qos/histogram.h create mode 100644 src/qos/nack_requester.cc create mode 100644 src/qos/nack_requester.h diff --git a/src/channel/rtp_channel/rtp_video_receiver.cpp b/src/channel/rtp_channel/rtp_video_receiver.cpp index 97c1ad5..537dd08 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.cpp +++ b/src/channel/rtp_channel/rtp_video_receiver.cpp @@ -20,7 +20,10 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock) [this](int64_t bitrate_bps, std::vector ssrcs) { SendRemb(bitrate_bps, ssrcs); }), - clock_(clock) { + clock_(clock), + rtcp_feedback_buffer_(this, this, this), + nack_(std::make_unique(clock, &rtcp_feedback_buffer_, + &rtcp_feedback_buffer_)) { SetPeriod(std::chrono::milliseconds(5)); // rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); } @@ -37,7 +40,10 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock, [this](int64_t bitrate_bps, std::vector ssrcs) { SendRemb(bitrate_bps, ssrcs); }), - clock_(clock) { + clock_(clock), + rtcp_feedback_buffer_(this, this, this), + nack_(std::make_unique(clock, &rtcp_feedback_buffer_, + &rtcp_feedback_buffer_)) { SetPeriod(std::chrono::milliseconds(5)); // rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); diff --git a/src/channel/rtp_channel/rtp_video_receiver.h b/src/channel/rtp_channel/rtp_video_receiver.h index 0f6d92b..6db7142 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.h +++ b/src/channel/rtp_channel/rtp_video_receiver.h @@ -9,6 +9,7 @@ #include "clock.h" #include "fec_decoder.h" #include "io_statistics.h" +#include "nack_requester.h" #include "receive_side_congestion_controller.h" #include "ringbuffer.h" #include "rtcp_receiver_report.h" @@ -21,7 +22,10 @@ using namespace webrtc; -class RtpVideoReceiver : public ThreadBase { +class RtpVideoReceiver : public ThreadBase, + public LossNotificationSender, + public KeyFrameRequestSender, + public NackSender { public: RtpVideoReceiver(std::shared_ptr clock); RtpVideoReceiver(std::shared_ptr clock, @@ -104,6 +108,65 @@ class RtpVideoReceiver : public ThreadBase { RtcpFeedbackSenderInterface* active_remb_module_; uint32_t feedback_ssrc_ = 0; + std::unique_ptr nack_; + + private: + class RtcpFeedbackBuffer : public KeyFrameRequestSender, + public NackSender, + public LossNotificationSender { + public: + RtcpFeedbackBuffer(KeyFrameRequestSender* key_frame_request_sender, + NackSender* nack_sender, + LossNotificationSender* loss_notification_sender); + + ~RtcpFeedbackBuffer() override = default; + + // KeyFrameRequestSender implementation. + void RequestKeyFrame() override; + + // NackSender implementation. + void SendNack(const std::vector& sequence_numbers, + bool buffering_allowed) override; + + // LossNotificationSender implementation. + void SendLossNotification(uint16_t last_decoded_seq_num, + uint16_t last_received_seq_num, + bool decodability_flag, + bool buffering_allowed) override; + + // Send all RTCP feedback messages buffered thus far. + void SendBufferedRtcpFeedback(); + + void ClearLossNotificationState(); + + private: + // LNTF-related state. + struct LossNotificationState { + LossNotificationState(uint16_t last_decoded_seq_num, + uint16_t last_received_seq_num, + bool decodability_flag) + : last_decoded_seq_num(last_decoded_seq_num), + last_received_seq_num(last_received_seq_num), + decodability_flag(decodability_flag) {} + + uint16_t last_decoded_seq_num; + uint16_t last_received_seq_num; + bool decodability_flag; + }; + + KeyFrameRequestSender* const key_frame_request_sender_; + NackSender* const nack_sender_; + LossNotificationSender* const loss_notification_sender_; + + // Key-frame-request-related state. + bool request_key_frame_; + + // NACK-related state. + std::vector nack_sequence_numbers_; + std::optional lntf_state_; + }; + RtcpFeedbackBuffer rtcp_feedback_buffer_; + private: FILE* file_rtp_recv_ = nullptr; }; diff --git a/src/qos/histogram.cc b/src/qos/histogram.cc new file mode 100644 index 0000000..3ba774c --- /dev/null +++ b/src/qos/histogram.cc @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "histogram.h" + +#include + +namespace webrtc { +Histogram::Histogram(size_t num_buckets, size_t max_num_values) { + buckets_.resize(num_buckets); + values_.reserve(max_num_values); + index_ = 0; +} + +void Histogram::Add(size_t value) { + value = std::min(value, buckets_.size() - 1); + if (index_ < values_.size()) { + --buckets_[values_[index_]]; + values_[index_] = value; + } else { + values_.emplace_back(value); + } + + ++buckets_[value]; + index_ = (index_ + 1) % values_.capacity(); +} + +size_t Histogram::InverseCdf(float probability) const { + size_t bucket = 0; + float accumulated_probability = 0; + while (accumulated_probability < probability && bucket < buckets_.size()) { + accumulated_probability += + static_cast(buckets_[bucket]) / values_.size(); + ++bucket; + } + return bucket; +} + +size_t Histogram::NumValues() const { return values_.size(); } + +} // namespace webrtc diff --git a/src/qos/histogram.h b/src/qos/histogram.h new file mode 100644 index 0000000..7914a75 --- /dev/null +++ b/src/qos/histogram.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_VIDEO_CODING_HISTOGRAM_H_ +#define MODULES_VIDEO_CODING_HISTOGRAM_H_ + +#include +#include + +namespace webrtc { +class Histogram { + public: + // A discrete histogram where every bucket with range [0, num_buckets). + // Values greater or equal to num_buckets will be placed in the last bucket. + Histogram(size_t num_buckets, size_t max_num_values); + + // Add a value to the histogram. If there already is max_num_values in the + // histogram then the oldest value will be replaced with the new value. + void Add(size_t value); + + // Calculates how many buckets have to be summed in order to accumulate at + // least the given probability. + size_t InverseCdf(float probability) const; + + // How many values that make up this histogram. + size_t NumValues() const; + + private: + // A circular buffer that holds the values that make up the histogram. + std::vector values_; + std::vector buckets_; + size_t index_; +}; + +} // namespace webrtc + +#endif // MODULES_VIDEO_CODING_HISTOGRAM_H_ diff --git a/src/qos/nack_requester.cc b/src/qos/nack_requester.cc new file mode 100644 index 0000000..e52f1ab --- /dev/null +++ b/src/qos/nack_requester.cc @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "nack_requester.h" + +#include "log.h" + +constexpr int kMaxPacketAge = 10'000; +constexpr int kMaxNackPackets = 1000; +constexpr TimeDelta kDefaultRtt = TimeDelta::Millis(100); +// Number of times a packet can be nacked before giving up. Nack is sent at most +// every RTT. +constexpr int kMaxNackRetries = 100; +constexpr int kMaxReorderedPackets = 128; +constexpr int kNumReorderingBuckets = 10; +// constexpr TimeDelta kDefaultSendNackDelay = TimeDelta::Zero(); +constexpr TimeDelta kDefaultSendNackDelay = TimeDelta::Millis(10); + +NackRequester::NackInfo::NackInfo() + : seq_num(0), + send_at_seq_num(0), + created_at_time(Timestamp::MinusInfinity()), + sent_at_time(Timestamp::MinusInfinity()), + retries(0) {} + +NackRequester::NackInfo::NackInfo(uint16_t seq_num, uint16_t send_at_seq_num, + Timestamp created_at_time) + : seq_num(seq_num), + send_at_seq_num(send_at_seq_num), + created_at_time(created_at_time), + sent_at_time(Timestamp::MinusInfinity()), + retries(0) {} + +NackRequester::NackRequester(Clock* clock, NackSender* nack_sender, + KeyFrameRequestSender* keyframe_request_sender) + : clock_(clock), + nack_sender_(nack_sender), + keyframe_request_sender_(keyframe_request_sender), + reordering_histogram_(kNumReorderingBuckets, kMaxReorderedPackets), + initialized_(false), + rtt_(kDefaultRtt), + newest_seq_num_(0), + send_nack_delay_(kDefaultSendNackDelay) {} + +NackRequester::~NackRequester() {} + +int NackRequester::OnReceivedPacket(uint16_t seq_num) { + return OnReceivedPacket(seq_num, false); +} + +int NackRequester::OnReceivedPacket(uint16_t seq_num, bool is_recovered) { + bool is_retransmitted = true; + + if (!initialized_) { + newest_seq_num_ = seq_num; + initialized_ = true; + return 0; + } + + if (seq_num == newest_seq_num_) return 0; + + if (AheadOf(newest_seq_num_, seq_num)) { + // An out of order packet has been received. + auto nack_list_it = nack_list_.find(seq_num); + int nacks_sent_for_packet = 0; + if (nack_list_it != nack_list_.end()) { + nacks_sent_for_packet = nack_list_it->second.retries; + nack_list_.erase(nack_list_it); + } + if (!is_retransmitted) UpdateReorderingStatistics(seq_num); + return nacks_sent_for_packet; + } + + if (is_recovered) { + recovered_list_.insert(seq_num); + + // Remove old ones so we don't accumulate recovered packets. + auto it = recovered_list_.lower_bound(seq_num - kMaxPacketAge); + if (it != recovered_list_.begin()) + recovered_list_.erase(recovered_list_.begin(), it); + + // Do not send nack for packets recovered by FEC or RTX. + return 0; + } + + AddPacketsToNack(newest_seq_num_ + 1, seq_num); + newest_seq_num_ = seq_num; + + // Are there any nacks that are waiting for this seq_num. + std::vector nack_batch = GetNackBatch(kSeqNumOnly); + if (!nack_batch.empty()) { + // This batch of NACKs is triggered externally; the initiator can + // batch them with other feedback messages. + nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/true); + } + + return 0; +} + +void NackRequester::ClearUpTo(uint16_t seq_num) { + nack_list_.erase(nack_list_.begin(), nack_list_.lower_bound(seq_num)); + recovered_list_.erase(recovered_list_.begin(), + recovered_list_.lower_bound(seq_num)); +} + +void NackRequester::UpdateRtt(int64_t rtt_ms) { + rtt_ = TimeDelta::Millis(rtt_ms); +} + +void NackRequester::AddPacketsToNack(uint16_t seq_num_start, + uint16_t seq_num_end) { + // Remove old packets. + auto it = nack_list_.lower_bound(seq_num_end - kMaxPacketAge); + nack_list_.erase(nack_list_.begin(), it); + + uint16_t num_new_nacks = ForwardDiff(seq_num_start, seq_num_end); + if (nack_list_.size() + num_new_nacks > kMaxNackPackets) { + nack_list_.clear(); + LOG_WARN("NACK list full, clearing NACK list and requesting keyframe."); + keyframe_request_sender_->RequestKeyFrame(); + return; + } + + for (uint16_t seq_num = seq_num_start; seq_num != seq_num_end; ++seq_num) { + // Do not send nack for packets that are already recovered by FEC or RTX + if (recovered_list_.find(seq_num) != recovered_list_.end()) continue; + NackInfo nack_info(seq_num, seq_num + WaitNumberOfPackets(0.5), + clock_->CurrentTime()); + nack_list_[seq_num] = nack_info; + } +} + +std::vector NackRequester::GetNackBatch(NackFilterOptions options) { + // Called on worker_thread_. + + bool consider_seq_num = options != kTimeOnly; + bool consider_timestamp = options != kSeqNumOnly; + Timestamp now = clock_->CurrentTime(); + std::vector nack_batch; + auto it = nack_list_.begin(); + while (it != nack_list_.end()) { + bool delay_timed_out = now - it->second.created_at_time >= send_nack_delay_; + bool nack_on_rtt_passed = now - it->second.sent_at_time >= rtt_; + bool nack_on_seq_num_passed = + it->second.sent_at_time.IsInfinite() && + AheadOrAt(newest_seq_num_, it->second.send_at_seq_num); + if (delay_timed_out && ((consider_seq_num && nack_on_seq_num_passed) || + (consider_timestamp && nack_on_rtt_passed))) { + nack_batch.emplace_back(it->second.seq_num); + ++it->second.retries; + it->second.sent_at_time = now; + if (it->second.retries >= kMaxNackRetries) { + LOG_WARN( + "Sequence number {} removed from NACK list due to max retries.", + it->second.seq_num); + it = nack_list_.erase(it); + } else { + ++it; + } + continue; + } + ++it; + } + return nack_batch; +} + +void NackRequester::UpdateReorderingStatistics(uint16_t seq_num) { + uint16_t diff = ReverseDiff(newest_seq_num_, seq_num); + reordering_histogram_.Add(diff); +} + +int NackRequester::WaitNumberOfPackets(float probability) const { + if (reordering_histogram_.NumValues() == 0) return 0; + return reordering_histogram_.InverseCdf(probability); +} \ No newline at end of file diff --git a/src/qos/nack_requester.h b/src/qos/nack_requester.h new file mode 100644 index 0000000..9617e33 --- /dev/null +++ b/src/qos/nack_requester.h @@ -0,0 +1,73 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-02-12 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _NACK_REQUESTER_H_ +#define _NACK_REQUESTER_H_ + +#include +#include +#include +#include +#include + +#include "api/units/timestamp.h" +#include "clock.h" +#include "histogram.h" +#include "module_common_types.h" +#include "rtc_base/numerics/sequence_number_util.h" + +using namespace webrtc; + +class NackRequester { + private: + // Which fields to consider when deciding which packet to nack in + // GetNackBatch. + enum NackFilterOptions { kSeqNumOnly, kTimeOnly, kSeqNumAndTime }; + + struct NackInfo { + NackInfo(); + NackInfo(uint16_t seq_num, uint16_t send_at_seq_num, + Timestamp created_at_time); + + uint16_t seq_num; + uint16_t send_at_seq_num; + Timestamp created_at_time; + Timestamp sent_at_time; + int retries; + }; + + public: + NackRequester(Clock* clock, NackSender* nack_sender, + KeyFrameRequestSender* keyframe_request_sender); + ~NackRequester(); + + public: + int OnReceivedPacket(uint16_t seq_num); + int OnReceivedPacket(uint16_t seq_num, bool is_recovered); + + private: + void ClearUpTo(uint16_t seq_num); + void UpdateRtt(int64_t rtt_ms); + void AddPacketsToNack(uint16_t seq_num_start, uint16_t seq_num_end); + std::vector GetNackBatch(NackFilterOptions options); + void UpdateReorderingStatistics(uint16_t seq_num); + int WaitNumberOfPackets(float probability) const; + + private: + Clock* const clock_; + NackSender* const nack_sender_; + KeyFrameRequestSender* const keyframe_request_sender_; + + std::map> nack_list_; + std::set> recovered_list_; + Histogram reordering_histogram_; + bool initialized_; + TimeDelta rtt_; + uint16_t newest_seq_num_; + const TimeDelta send_nack_delay_; +}; + +#endif \ No newline at end of file diff --git a/src/qos/remote_bitrate_estimator_single_stream.ccxx b/src/qos/remote_bitrate_estimator_single_stream.ccxx index 9c3006f..e23c22a 100644 --- a/src/qos/remote_bitrate_estimator_single_stream.ccxx +++ b/src/qos/remote_bitrate_estimator_single_stream.ccxx @@ -16,7 +16,7 @@ #include "aimd_rate_control.h" #include "bwe_defines.h" -#include "clock.h" +#include “clock.h" #include "inter_arrival.h" #include "log.h" #include "overuse_detector.h" diff --git a/src/transport/ice_transport_controller.h b/src/transport/ice_transport_controller.h index 3350fd1..39a9678 100644 --- a/src/transport/ice_transport_controller.h +++ b/src/transport/ice_transport_controller.h @@ -115,7 +115,7 @@ class IceTransportController bool audio_codec_inited_ = false; private: - uint64_t target_bitrate_ = 0; + int64_t target_bitrate_ = 0; }; #endif \ No newline at end of file