diff --git a/src/channel/rtp_channel/rtp_video_receiver.cpp b/src/channel/rtp_channel/rtp_video_receiver.cpp index 537dd08..b99639d 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.cpp +++ b/src/channel/rtp_channel/rtp_video_receiver.cpp @@ -2,6 +2,7 @@ #include "common.h" #include "log.h" +#include "nack.h" #include "rtcp_sender.h" // #define SAVE_RTP_RECV_STREAM @@ -21,9 +22,12 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock) SendRemb(bitrate_bps, ssrcs); }), clock_(clock), - rtcp_feedback_buffer_(this, this, this), - nack_(std::make_unique(clock, &rtcp_feedback_buffer_, - &rtcp_feedback_buffer_)) { + rtcp_sender_(std::make_unique( + [this](const uint8_t* buffer, size_t size) -> int { + return data_send_func_((const char*)buffer, size); + }, + 1200)), + nack_(std::make_unique(clock, this, this)) { SetPeriod(std::chrono::milliseconds(5)); // rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); } @@ -41,9 +45,12 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock, SendRemb(bitrate_bps, ssrcs); }), clock_(clock), - rtcp_feedback_buffer_(this, this, this), - nack_(std::make_unique(clock, &rtcp_feedback_buffer_, - &rtcp_feedback_buffer_)) { + rtcp_sender_(std::make_unique( + [this](const uint8_t* buffer, size_t size) -> int { + return data_send_func_((const char*)buffer, size); + }, + 1200)), + nack_(std::make_unique(clock, this, this)) { SetPeriod(std::chrono::milliseconds(5)); // rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); @@ -83,6 +90,8 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { rtp_statistics_->Start(); } + remote_ssrc_ = rtp_packet.Ssrc(); + #ifdef SAVE_RTP_RECV_STREAM fwrite((unsigned char*)rtp_packet.Payload(), 1, rtp_packet.PayloadSize(), file_rtp_recv_); @@ -90,7 +99,6 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { webrtc::RtpPacketReceived rtp_packet_received; rtp_packet_received.Build(rtp_packet.Buffer().data(), rtp_packet.Size()); - rtp_packet_received.set_arrival_time(clock_->CurrentTime()); rtp_packet_received.set_ecn(EcnMarking::kEct0); rtp_packet_received.set_recovered(false); @@ -98,6 +106,8 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { receive_side_congestion_controller_.OnReceivedPacket(rtp_packet_received, MediaType::VIDEO); + nack_->OnReceivedPacket(rtp_packet.SequenceNumber()); + last_recv_bytes_ = (uint32_t)rtp_packet.PayloadSize(); total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize(); total_rtp_packets_recv_++; @@ -434,16 +444,10 @@ void RtpVideoReceiver::SendCombinedRtcpPacket( // LOG_ERROR("Send combined rtcp packet"); - RTCPSender rtcp_sender( - [this](const uint8_t* buffer, size_t size) -> int { - return data_send_func_((const char*)buffer, size); - }, - 1200); - for (auto& rtcp_packet : rtcp_packets) { rtcp_packet->SetSenderSsrc(feedback_ssrc_); - rtcp_sender.AppendPacket(*rtcp_packet); - rtcp_sender.Send(); + rtcp_sender_->AppendPacket(*rtcp_packet); + rtcp_sender_->Send(); } } @@ -517,4 +521,26 @@ void RtpVideoReceiver::RtcpThread() { } } } -} \ No newline at end of file +} + +/******************************************************************************/ + +void RtpVideoReceiver::SendNack(const std::vector& nack_list, + bool buffering_allowed) { + if (!nack_list.empty()) { + webrtc::rtcp::Nack nack; + nack.SetSenderSsrc(feedback_ssrc_); + nack.SetMediaSsrc(remote_ssrc_); + nack.SetPacketIds(std::move(nack_list)); + + rtcp_sender_->AppendPacket(nack); + rtcp_sender_->Send(); + } +} + +void RtpVideoReceiver::RequestKeyFrame() {} + +void RtpVideoReceiver::SendLossNotification(uint16_t last_decoded_seq_num, + uint16_t last_received_seq_num, + bool decodability_flag, + bool buffering_allowed) {} diff --git a/src/channel/rtp_channel/rtp_video_receiver.h b/src/channel/rtp_channel/rtp_video_receiver.h index 6db7142..bca6e12 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.h +++ b/src/channel/rtp_channel/rtp_video_receiver.h @@ -13,6 +13,7 @@ #include "receive_side_congestion_controller.h" #include "ringbuffer.h" #include "rtcp_receiver_report.h" +#include "rtcp_sender.h" #include "rtp_packet_av1.h" #include "rtp_packet_h264.h" #include "rtp_rtcp_defines.h" @@ -63,6 +64,15 @@ class RtpVideoReceiver : public ThreadBase, bool Process() override; void RtcpThread(); + private: + void SendNack(const std::vector& nack_list, bool buffering_allowed); + + void RequestKeyFrame(); + + void SendLossNotification(uint16_t last_decoded_seq_num, + uint16_t last_received_seq_num, + bool decodability_flag, bool buffering_allowed); + private: std::map incomplete_h264_frame_list_; std::map incomplete_av1_frame_list_; @@ -107,66 +117,11 @@ class RtpVideoReceiver : public ThreadBase, ReceiveSideCongestionController receive_side_congestion_controller_; RtcpFeedbackSenderInterface* active_remb_module_; uint32_t feedback_ssrc_ = 0; + uint32_t remote_ssrc_ = 0; + std::unique_ptr rtcp_sender_; std::unique_ptr nack_; - private: - class RtcpFeedbackBuffer : public KeyFrameRequestSender, - public NackSender, - public LossNotificationSender { - public: - RtcpFeedbackBuffer(KeyFrameRequestSender* key_frame_request_sender, - NackSender* nack_sender, - LossNotificationSender* loss_notification_sender); - - ~RtcpFeedbackBuffer() override = default; - - // KeyFrameRequestSender implementation. - void RequestKeyFrame() override; - - // NackSender implementation. - void SendNack(const std::vector& sequence_numbers, - bool buffering_allowed) override; - - // LossNotificationSender implementation. - void SendLossNotification(uint16_t last_decoded_seq_num, - uint16_t last_received_seq_num, - bool decodability_flag, - bool buffering_allowed) override; - - // Send all RTCP feedback messages buffered thus far. - void SendBufferedRtcpFeedback(); - - void ClearLossNotificationState(); - - private: - // LNTF-related state. - struct LossNotificationState { - LossNotificationState(uint16_t last_decoded_seq_num, - uint16_t last_received_seq_num, - bool decodability_flag) - : last_decoded_seq_num(last_decoded_seq_num), - last_received_seq_num(last_received_seq_num), - decodability_flag(decodability_flag) {} - - uint16_t last_decoded_seq_num; - uint16_t last_received_seq_num; - bool decodability_flag; - }; - - KeyFrameRequestSender* const key_frame_request_sender_; - NackSender* const nack_sender_; - LossNotificationSender* const loss_notification_sender_; - - // Key-frame-request-related state. - bool request_key_frame_; - - // NACK-related state. - std::vector nack_sequence_numbers_; - std::optional lntf_state_; - }; - RtcpFeedbackBuffer rtcp_feedback_buffer_; - private: FILE* file_rtp_recv_ = nullptr; }; diff --git a/src/qos/nack_requester.cc b/src/qos/nack_requester.cc index e52f1ab..a2890c5 100644 --- a/src/qos/nack_requester.cc +++ b/src/qos/nack_requester.cc @@ -38,7 +38,8 @@ NackRequester::NackInfo::NackInfo(uint16_t seq_num, uint16_t send_at_seq_num, sent_at_time(Timestamp::MinusInfinity()), retries(0) {} -NackRequester::NackRequester(Clock* clock, NackSender* nack_sender, +NackRequester::NackRequester(std::shared_ptr clock, + NackSender* nack_sender, KeyFrameRequestSender* keyframe_request_sender) : clock_(clock), nack_sender_(nack_sender), @@ -96,9 +97,7 @@ int NackRequester::OnReceivedPacket(uint16_t seq_num, bool is_recovered) { // Are there any nacks that are waiting for this seq_num. std::vector nack_batch = GetNackBatch(kSeqNumOnly); if (!nack_batch.empty()) { - // This batch of NACKs is triggered externally; the initiator can - // batch them with other feedback messages. - nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/true); + nack_sender_->SendNack(nack_batch, false); } return 0; diff --git a/src/qos/nack_requester.h b/src/qos/nack_requester.h index 9617e33..d1f0e18 100644 --- a/src/qos/nack_requester.h +++ b/src/qos/nack_requester.h @@ -40,7 +40,7 @@ class NackRequester { }; public: - NackRequester(Clock* clock, NackSender* nack_sender, + NackRequester(std::shared_ptr clock, NackSender* nack_sender, KeyFrameRequestSender* keyframe_request_sender); ~NackRequester(); @@ -57,7 +57,7 @@ class NackRequester { int WaitNumberOfPackets(float probability) const; private: - Clock* const clock_; + std::shared_ptr clock_; NackSender* const nack_sender_; KeyFrameRequestSender* const keyframe_request_sender_; diff --git a/src/qos/congestion_control_feedback.cpp b/src/rtcp/congestion_control_feedback.cpp similarity index 100% rename from src/qos/congestion_control_feedback.cpp rename to src/rtcp/congestion_control_feedback.cpp diff --git a/src/qos/congestion_control_feedback.h b/src/rtcp/congestion_control_feedback.h similarity index 100% rename from src/qos/congestion_control_feedback.h rename to src/rtcp/congestion_control_feedback.h diff --git a/src/rtcp/nack.cpp b/src/rtcp/nack.cpp new file mode 100644 index 0000000..027788f --- /dev/null +++ b/src/rtcp/nack.cpp @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "nack.h" + +#include +#include +#include + +#include "byte_io.h" +#include "common_header.h" +#include "log.h" + +namespace webrtc { +namespace rtcp { +// RFC 4585: Feedback format. +// +// Common packet format: +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |V=2|P| FMT | PT | length | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// 0 | SSRC of packet sender | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// 4 | SSRC of media source | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// : Feedback Control Information (FCI) : +// : : +// +// Generic NACK (RFC 4585). +// +// FCI: +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | PID | BLP | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +Nack::Nack() = default; +Nack::Nack(const Nack& rhs) = default; +Nack::~Nack() = default; + +bool Nack::Parse(const CommonHeader& packet) { + if (packet.payload_size_bytes() < kCommonFeedbackLength + kNackItemLength) { + LOG_WARN("Payload length {} is too small for a Nack.", + packet.payload_size_bytes()); + return false; + } + size_t nack_items = + (packet.payload_size_bytes() - kCommonFeedbackLength) / kNackItemLength; + + ParseCommonFeedback(packet.payload()); + const uint8_t* next_nack = packet.payload() + kCommonFeedbackLength; + + packet_ids_.clear(); + packed_.resize(nack_items); + for (size_t index = 0; index < nack_items; ++index) { + packed_[index].first_pid = ByteReader::ReadBigEndian(next_nack); + packed_[index].bitmask = ByteReader::ReadBigEndian(next_nack + 2); + next_nack += kNackItemLength; + } + Unpack(); + + return true; +} + +size_t Nack::BlockLength() const { + return kHeaderLength + kCommonFeedbackLength + + packed_.size() * kNackItemLength; +} + +bool Nack::Create(uint8_t* packet, size_t* index, size_t max_length, + PacketReadyCallback callback) const { + // If nack list can't fit in packet, try to fragment. + constexpr size_t kNackHeaderLength = kHeaderLength + kCommonFeedbackLength; + for (size_t nack_index = 0; nack_index < packed_.size();) { + size_t bytes_left_in_buffer = max_length - *index; + if (bytes_left_in_buffer < kNackHeaderLength + kNackItemLength) { + if (!OnBufferFull(packet, index, callback)) return false; + continue; + } + size_t num_nack_fields = + std::min((bytes_left_in_buffer - kNackHeaderLength) / kNackItemLength, + packed_.size() - nack_index); + + size_t payload_size_bytes = + kCommonFeedbackLength + (num_nack_fields * kNackItemLength); + size_t payload_size_32bits = payload_size_bytes / 4; + CreateHeader(kFeedbackMessageType, kPacketType, payload_size_32bits, packet, + index); + + CreateCommonFeedback(packet + *index); + *index += kCommonFeedbackLength; + + size_t nack_end_index = nack_index + num_nack_fields; + for (; nack_index < nack_end_index; ++nack_index) { + const PackedNack& item = packed_[nack_index]; + ByteWriter::WriteBigEndian(packet + *index + 0, item.first_pid); + ByteWriter::WriteBigEndian(packet + *index + 2, item.bitmask); + *index += kNackItemLength; + } + } + + return true; +} + +void Nack::SetPacketIds(const uint16_t* nack_list, size_t length) { + SetPacketIds(std::vector(nack_list, nack_list + length)); +} + +void Nack::SetPacketIds(std::vector nack_list) { + packet_ids_ = std::move(nack_list); + Pack(); +} + +void Nack::Pack() { + auto it = packet_ids_.begin(); + const auto end = packet_ids_.end(); + while (it != end) { + PackedNack item; + item.first_pid = *it++; + // Bitmask specifies losses in any of the 16 packets following the pid. + item.bitmask = 0; + while (it != end) { + uint16_t shift = static_cast(*it - item.first_pid - 1); + if (shift <= 15) { + item.bitmask |= (1 << shift); + ++it; + } else { + break; + } + } + packed_.push_back(item); + } +} + +void Nack::Unpack() { + for (const PackedNack& item : packed_) { + packet_ids_.push_back(item.first_pid); + uint16_t pid = item.first_pid + 1; + for (uint16_t bitmask = item.bitmask; bitmask != 0; bitmask >>= 1, ++pid) { + if (bitmask & 1) packet_ids_.push_back(pid); + } + } +} + +} // namespace rtcp +} // namespace webrtc diff --git a/src/rtcp/nack.h b/src/rtcp/nack.h new file mode 100644 index 0000000..3f08c25 --- /dev/null +++ b/src/rtcp/nack.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_RTP_RTCP_SOURCE_RTCP_PACKET_NACK_H_ +#define MODULES_RTP_RTCP_SOURCE_RTCP_PACKET_NACK_H_ + +#include + +#include "common_header.h" +#include "rtp_feedback.h" + +namespace webrtc { +namespace rtcp { +class Nack : public RtpFeedback { + public: + static constexpr uint8_t kFeedbackMessageType = 1; + Nack(); + Nack(const Nack&); + ~Nack() override; + + // Parse assumes header is already parsed and validated. + bool Parse(const CommonHeader& packet); + + void SetPacketIds(const uint16_t* nack_list, size_t length); + void SetPacketIds(std::vector nack_list); + const std::vector& packet_ids() const { return packet_ids_; } + + size_t BlockLength() const override; + + bool Create(uint8_t* packet, size_t* index, size_t max_length, + PacketReadyCallback callback) const override; + + private: + static constexpr size_t kNackItemLength = 4; + struct PackedNack { + uint16_t first_pid; + uint16_t bitmask; + }; + + void Pack(); // Fills packed_ using packed_ids_. (used in SetPacketIds). + void Unpack(); // Fills packet_ids_ using packed_. (used in Parse). + + std::vector packed_; + std::vector packet_ids_; +}; + +} // namespace rtcp +} // namespace webrtc +#endif // MODULES_RTP_RTCP_SOURCE_RTCP_PACKET_NACK_H_ diff --git a/src/rtcp/rtcp_packet/rtcp_packet.h b/src/rtcp/rtcp_packet/rtcp_packet.h index 0bf32e3..a771b24 100644 --- a/src/rtcp/rtcp_packet/rtcp_packet.h +++ b/src/rtcp/rtcp_packet/rtcp_packet.h @@ -16,7 +16,7 @@ class RtcpPacket { public: - typedef enum { SR = 200, RR = 201, TCC = 205 } PAYLOAD_TYPE; + typedef enum { SR = 200, RR = 201, TCC = 11, NACK = 1 } PAYLOAD_TYPE; // Callback used to signal that an RTCP packet is ready. Note that this may // not contain all data in this RtcpPacket; if a packet cannot fit in // max_length bytes, it will be fragmented and multiple calls to this @@ -61,7 +61,4 @@ class RtcpPacket { uint32_t sender_ssrc_ = 0; }; -using RtcpSender = - std::function> packets)>; - #endif \ No newline at end of file diff --git a/src/rtcp/rtcp_packet/rtcp_packet_info.h b/src/rtcp/rtcp_packet/rtcp_packet_info.h index 40e315f..346a64c 100644 --- a/src/rtcp/rtcp_packet/rtcp_packet_info.h +++ b/src/rtcp/rtcp_packet/rtcp_packet_info.h @@ -15,6 +15,7 @@ #include #include "congestion_control_feedback.h" +#include "nack.h" struct RtcpPacketInfo { uint32_t packet_type_flags = 0; // RTCPPacketTypeFlags bit field. diff --git a/src/rtcp/rtcp_sender/rtcp_sender.h b/src/rtcp/rtcp_sender/rtcp_sender.h index 736db63..ce760c1 100644 --- a/src/rtcp/rtcp_sender/rtcp_sender.h +++ b/src/rtcp/rtcp_sender/rtcp_sender.h @@ -14,16 +14,16 @@ #include "log.h" -class RTCPSender { +class RtcpSender { public: - RTCPSender(std::function callback, + RtcpSender(std::function callback, size_t max_packet_size) : callback_(callback), max_packet_size_(max_packet_size) { if (max_packet_size >= IP_PACKET_SIZE) { LOG_ERROR("max_packet_size must be less than IP_PACKET_SIZE"); } } - ~RTCPSender() { + ~RtcpSender() { if (index_ != 0) { LOG_ERROR("Unsent rtcp packet"); } diff --git a/src/transport/ice_transport.cpp b/src/transport/ice_transport.cpp index bf40d8d..199d3d7 100644 --- a/src/transport/ice_transport.cpp +++ b/src/transport/ice_transport.cpp @@ -232,15 +232,19 @@ bool IceTransport::ParseRtcpPacket(const uint8_t *buffer, size_t size, LOG_INFO("Receiver report"); // valid = HandleReceiverReport(rtcp_block, rtcp_packet_info); break; - case RtcpPacket::PAYLOAD_TYPE::TCC: + case RtpFeedback::kPacketType: switch (rtcp_block.fmt()) { case webrtc::rtcp::CongestionControlFeedback::kFeedbackMessageType: valid = HandleCongestionControlFeedback(rtcp_block, rtcp_packet_info); break; + case webrtc::rtcp::Nack::kFeedbackMessageType: + valid = HandleNack(rtcp_block, rtcp_packet_info); + break; default: break; } break; + // case rtcp::Psfb::kPacketType: // switch (rtcp_block.fmt()) { // case rtcp::Pli::kFeedbackMessageType: @@ -314,6 +318,24 @@ bool IceTransport::HandleCongestionControlFeedback( return true; } +bool IceTransport::HandleNack(const webrtc::rtcp::CommonHeader &rtcp_block, + RtcpPacketInfo *rtcp_packet_info) { + webrtc::rtcp::Nack nack; + if (!nack.Parse(rtcp_block)) { + return false; + } + + // uint32_t first_media_source_ssrc = nack.ssrc(); + // if (first_media_source_ssrc == local_media_ssrc() || + // registered_ssrcs_.contains(first_media_source_ssrc)) { + // rtcp_packet_info->nack.emplace(std::move(nack)); + // } + + LOG_INFO("Nack [{}]", nack.packet_ids().size()); + + return true; +} + int IceTransport::DestroyIceTransmission() { LOG_INFO("[{}->{}] Destroy ice transmission", user_id_, remote_user_id_); is_closed_ = true; diff --git a/src/transport/ice_transport.h b/src/transport/ice_transport.h index a8f8ee3..d0e8016 100644 --- a/src/transport/ice_transport.h +++ b/src/transport/ice_transport.h @@ -132,6 +132,9 @@ class IceTransport { const webrtc::rtcp::CommonHeader &rtcp_block, RtcpPacketInfo *rtcp_packet_info); + bool HandleNack(const webrtc::rtcp::CommonHeader &rtcp_block, + RtcpPacketInfo *rtcp_packet_info); + private: bool hardware_acceleration_ = false; bool use_trickle_ice_ = true; diff --git a/src/transport/ice_transport_controller.cpp b/src/transport/ice_transport_controller.cpp index eaa89a9..0f03203 100644 --- a/src/transport/ice_transport_controller.cpp +++ b/src/transport/ice_transport_controller.cpp @@ -374,7 +374,7 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) { target_bitrate_ = update.target_rate.has_value() ? update.target_rate->target_rate.bps() : 0; - // LOG_WARN("Target bitrate [{}]bps", target_bitrate_); + LOG_WARN("Target bitrate [{}]bps", target_bitrate_); video_encoder_->SetTargetBitrate(target_bitrate_); }