diff --git a/src/qos/prioritized_packet_queue.cc b/src/qos/prioritized_packet_queue.cc index 18b2cfa..e7abfb5 100644 --- a/src/qos/prioritized_packet_queue.cc +++ b/src/qos/prioritized_packet_queue.cc @@ -100,9 +100,14 @@ PrioritizedPacketQueue::QueuedPacket PrioritizedPacketQueue::StreamQueue::DequeuePacket(int priority_level) { QueuedPacket packet = std::move(packets_[priority_level].front()); packets_[priority_level].pop_front(); - if (packet.packet->is_key_frame()) { - --num_keyframe_packets_; + if (!packet.packet) { + LOG_WARN("Packet is null"); + } else { + if (packet.packet->is_key_frame()) { + --num_keyframe_packets_; + } } + return packet; } @@ -368,6 +373,12 @@ bool PrioritizedPacketQueue::HasKeyframePackets(uint32_t ssrc) const { void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) { --size_packets_; + + if (!packet.packet) { + LOG_WARN("Packet is null"); + return; + } + RtpPacketMediaType packet_type = packet.packet->packet_type().value(); --size_packets_per_media_type_[static_cast(packet_type)]; size_payload_ -= packet.PacketSize(); diff --git a/src/rtp/rtp_packet/rtp_packet_history.cpp b/src/rtp/rtp_packet/rtp_packet_history.cpp index df48ae4..b355943 100644 --- a/src/rtp/rtp_packet/rtp_packet_history.cpp +++ b/src/rtp/rtp_packet/rtp_packet_history.cpp @@ -3,10 +3,28 @@ #include "log.h" #include "sequence_number_compare.h" -RtpPacketHistory::RtpPacketHistory(std::shared_ptr clock) - : clock_(clock), +RtpPacketHistory::StoredPacket::StoredPacket( + std::unique_ptr packet, + webrtc::Timestamp send_time, uint64_t insert_order) + : packet_(std::move(packet)), + pending_transmission_(false), + send_time_(send_time), + insert_order_(insert_order), + times_retransmitted_(0) {} + +RtpPacketHistory::StoredPacket::StoredPacket(StoredPacket&&) = default; +RtpPacketHistory::StoredPacket& RtpPacketHistory::StoredPacket::operator=( + RtpPacketHistory::StoredPacket&&) = default; +RtpPacketHistory::StoredPacket::~StoredPacket() = default; + +void RtpPacketHistory::StoredPacket::IncrementTimesRetransmitted() { + ++times_retransmitted_; +} + +RtpPacketHistory::RtpPacketHistory(std::shared_ptr clock) + : clock_(webrtc::Clock::GetWebrtcClockShared(clock)), rtt_(webrtc::TimeDelta::MinusInfinity()), - number_to_store_(0), + number_to_store_(kMaxCapacity), packets_inserted_(0) {} RtpPacketHistory::~RtpPacketHistory() {} @@ -16,15 +34,14 @@ void RtpPacketHistory::SetRtt(webrtc::TimeDelta rtt) { RemoveDeadPackets(); } -void RtpPacketHistory::AddPacket( - std::unique_ptr rtp_packet, - webrtc::Timestamp send_time) { +void RtpPacketHistory::PutRtpPacket( + std::unique_ptr rtp_packet, int64_t send_time) { RemoveDeadPackets(); const uint16_t rtp_seq_no = rtp_packet->SequenceNumber(); int packet_index = GetPacketIndex(rtp_packet->SequenceNumber()); if (packet_index >= 0 && - static_cast(packet_index) < rtp_packet_history_.size() && - rtp_packet_history_[packet_index].rtp_packet != nullptr) { + static_cast(packet_index) < packet_history_.size() && + packet_history_[packet_index].packet_ != nullptr) { LOG_WARN("Duplicate packet inserted: {}", rtp_seq_no); // Remove previous packet to avoid inconsistent state. RemovePacket(packet_index); @@ -33,15 +50,16 @@ void RtpPacketHistory::AddPacket( // Packet to be inserted ahead of first packet, expand front. for (; packet_index < 0; ++packet_index) { - rtp_packet_history_.emplace_front(); + packet_history_.emplace_front(); } // Packet to be inserted behind last packet, expand back. - while (static_cast(rtp_packet_history_.size()) <= packet_index) { - rtp_packet_history_.emplace_back(); + while (static_cast(packet_history_.size()) <= packet_index) { + packet_history_.emplace_back(); } - rtp_packet_history_[packet_index] = {std::move(rtp_packet), send_time, - packets_inserted_++}; + packet_history_[packet_index] = {std::move(rtp_packet), + webrtc::Timestamp::Micros(send_time), + packets_inserted_++}; } void RtpPacketHistory::RemoveDeadPackets() { @@ -50,23 +68,27 @@ void RtpPacketHistory::RemoveDeadPackets() { rtt_.IsFinite() ? (std::max)(kMinPacketDurationRtt * rtt_, kMinPacketDuration) : kMinPacketDuration; - while (!rtp_packet_history_.empty()) { - if (rtp_packet_history_.size() >= kMaxCapacity) { + while (!packet_history_.empty()) { + if (packet_history_.size() >= kMaxCapacity) { // We have reached the absolute max capacity, remove one packet // unconditionally. RemovePacket(0); continue; } - const RtpPacketToSendInfo& stored_packet = rtp_packet_history_.front(); + const StoredPacket& stored_packet = packet_history_.front(); + if (stored_packet.pending_transmission_) { + // Don't remove packets in the pacer queue, pending tranmission. + return; + } - if (stored_packet.send_time + packet_duration > now) { + if (stored_packet.send_time() + packet_duration > now) { // Don't cull packets too early to avoid failed retransmission requests. return; } - if (rtp_packet_history_.size() >= number_to_store_ || - stored_packet.send_time + + if (packet_history_.size() >= number_to_store_ || + stored_packet.send_time() + (packet_duration * kPacketCullingDelayFactor) <= now) { // Too many packets in history, or this packet has timed out. Remove it @@ -79,15 +101,80 @@ void RtpPacketHistory::RemoveDeadPackets() { } } +std::unique_ptr +RtpPacketHistory::GetPacketAndMarkAsPending(uint16_t sequence_number) { + return GetPacketAndMarkAsPending( + sequence_number, [](const webrtc::RtpPacketToSend& packet) { + return std::make_unique(packet); + }); +} + +std::unique_ptr +RtpPacketHistory::GetPacketAndMarkAsPending( + uint16_t sequence_number, + std::function( + const webrtc::RtpPacketToSend&)> + encapsulate) { + StoredPacket* packet = GetStoredPacket(sequence_number); + if (packet == nullptr) { + return nullptr; + } + + if (packet->pending_transmission_) { + // Packet already in pacer queue, ignore this request. + return nullptr; + } + + if (!VerifyRtt(*packet)) { + // Packet already resent within too short a time window, ignore. + return nullptr; + } + + // Copy and/or encapsulate packet. + std::unique_ptr encapsulated_packet = + encapsulate(*packet->packet_); + if (encapsulated_packet) { + packet->pending_transmission_ = true; + } + + return encapsulated_packet; +} + +void RtpPacketHistory::MarkPacketAsSent(uint16_t sequence_number) { + StoredPacket* packet = GetStoredPacket(sequence_number); + if (packet == nullptr) { + return; + } + + // Update send-time, mark as no longer in pacer queue, and increment + // transmission count. + packet->set_send_time(clock_->CurrentTime()); + packet->pending_transmission_ = false; + packet->IncrementTimesRetransmitted(); +} + +bool RtpPacketHistory::VerifyRtt( + const RtpPacketHistory::StoredPacket& packet) const { + if (packet.times_retransmitted() > 0 && + clock_->CurrentTime() - packet.send_time() < rtt_) { + // This packet has already been retransmitted once, and the time since + // that even is lower than on RTT. Ignore request as this packet is + // likely already in the network pipe. + return false; + } + + return true; +} + std::unique_ptr RtpPacketHistory::RemovePacket( int packet_index) { // Move the packet out from the StoredPacket container. std::unique_ptr rtp_packet = - std::move(rtp_packet_history_[packet_index].rtp_packet); + std::move(packet_history_[packet_index].packet_); if (packet_index == 0) { - while (!rtp_packet_history_.empty() && - rtp_packet_history_.front().rtp_packet == nullptr) { - rtp_packet_history_.pop_front(); + while (!packet_history_.empty() && + packet_history_.front().packet_ == nullptr) { + packet_history_.pop_front(); } } @@ -95,11 +182,11 @@ std::unique_ptr RtpPacketHistory::RemovePacket( } int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const { - if (rtp_packet_history_.empty()) { + if (packet_history_.empty()) { return 0; } - int first_seq = rtp_packet_history_.front().rtp_packet->SequenceNumber(); + int first_seq = packet_history_.front().packet_->SequenceNumber(); if (first_seq == sequence_number) { return 0; } @@ -118,4 +205,14 @@ int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const { } return packet_index; +} + +RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket( + uint16_t sequence_number) { + int index = GetPacketIndex(sequence_number); + if (index < 0 || static_cast(index) >= packet_history_.size() || + packet_history_[index].packet_ == nullptr) { + return nullptr; + } + return &packet_history_[index]; } \ No newline at end of file diff --git a/src/rtp/rtp_packet/rtp_packet_history.h b/src/rtp/rtp_packet/rtp_packet_history.h index 9537693..578ce8c 100644 --- a/src/rtp/rtp_packet/rtp_packet_history.h +++ b/src/rtp/rtp_packet/rtp_packet_history.h @@ -11,6 +11,7 @@ #include #include "api/clock/clock.h" +#include "clock/system_clock.h" #include "rtp_packet_to_send.h" class RtpPacketHistory { @@ -26,39 +27,70 @@ class RtpPacketHistory { static constexpr int kPacketCullingDelayFactor = 3; public: - RtpPacketHistory(std::shared_ptr clock); + RtpPacketHistory(std::shared_ptr clock); ~RtpPacketHistory(); public: void SetRtt(webrtc::TimeDelta rtt); - void AddPacket(std::unique_ptr rtp_packet, - webrtc::Timestamp send_time); - void RemoveDeadPackets(); + void PutRtpPacket(std::unique_ptr rtp_packet, + int64_t send_time); + void MarkPacketAsSent(uint16_t sequence_number); + + std::unique_ptr GetPacketAndMarkAsPending( + uint16_t sequence_number); + + std::unique_ptr GetPacketAndMarkAsPending( + uint16_t sequence_number, + std::function( + const webrtc::RtpPacketToSend&)> + encapsulate); private: std::unique_ptr RemovePacket(int packet_index); int GetPacketIndex(uint16_t sequence_number) const; private: - struct RtpPacketToSendInfo { - RtpPacketToSendInfo() = default; - RtpPacketToSendInfo(std::unique_ptr rtp_packet, - webrtc::Timestamp send_time, uint64_t index) - : rtp_packet(std::move(rtp_packet)), - send_time(send_time), - index(index) {} - RtpPacketToSendInfo(RtpPacketToSendInfo&&) = default; - RtpPacketToSendInfo& operator=(RtpPacketToSendInfo&&) = default; - ~RtpPacketToSendInfo() = default; + class StoredPacket { + public: + StoredPacket() = default; + StoredPacket(std::unique_ptr packet, + webrtc::Timestamp send_time, uint64_t insert_order); + StoredPacket(StoredPacket&&); + StoredPacket& operator=(StoredPacket&&); + ~StoredPacket(); - std::unique_ptr rtp_packet; - webrtc::Timestamp send_time = webrtc::Timestamp::Zero(); - uint64_t index; + uint64_t insert_order() const { return insert_order_; } + size_t times_retransmitted() const { return times_retransmitted_; } + void IncrementTimesRetransmitted(); + + // The time of last transmission, including retransmissions. + webrtc::Timestamp send_time() const { return send_time_; } + void set_send_time(webrtc::Timestamp value) { send_time_ = value; } + + // The actual packet. + std::unique_ptr packet_; + + // True if the packet is currently in the pacer queue pending transmission. + bool pending_transmission_; + + private: + webrtc::Timestamp send_time_ = webrtc::Timestamp::Zero(); + + // Unique number per StoredPacket, incremented by one for each added + // packet. Used to sort on insert order. + uint64_t insert_order_; + + // Number of times RE-transmitted, ie excluding the first transmission. + size_t times_retransmitted_; }; + void RemoveDeadPackets(); + bool VerifyRtt(const StoredPacket& packet) const; + StoredPacket* GetStoredPacket(uint16_t sequence_number); + private: std::shared_ptr clock_; - std::deque rtp_packet_history_; + std::deque packet_history_; uint64_t packets_inserted_; webrtc::TimeDelta rtt_; size_t number_to_store_; diff --git a/src/transport/channel/rtp_video_receiver.cpp b/src/transport/channel/rtp_video_receiver.cpp index 6a683d4..0d6f0b2 100644 --- a/src/transport/channel/rtp_video_receiver.cpp +++ b/src/transport/channel/rtp_video_receiver.cpp @@ -11,6 +11,7 @@ #define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2) #define RTCP_RR_INTERVAL 1000 +#define MAX_WAIT_TIME_MS 20 // 20ms RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock) : ssrc_(GenerateUniqueSsrc()), @@ -231,8 +232,23 @@ void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) { } else if (rtp::NAL_UNIT_TYPE::FU_A == nalu_type) { incomplete_h264_frame_list_[rtp_packet_h264.SequenceNumber()] = rtp_packet_h264; - bool complete = CheckIsH264FrameCompleted(rtp_packet_h264); - if (!complete) { + if (incomplete_h264_frame_list_.find( + rtp_packet_h264.SequenceNumber()) == + incomplete_h264_frame_list_.end()) { + LOG_ERROR("missing seq {}", rtp_packet_h264.SequenceNumber()); + } + if (rtp_packet_h264.FuAEnd()) { + CheckIsH264FrameCompletedFuaEndReceived(rtp_packet_h264); + } else { + auto missing_seqs_iter = + missing_sequence_numbers_.find(rtp_packet_h264.Timestamp()); + if (missing_seqs_iter != missing_sequence_numbers_.end()) { + auto missing_seqs = missing_seqs_iter->second; + if (missing_seqs.find(rtp_packet_h264.SequenceNumber()) != + missing_seqs.end()) { + CheckIsH264FrameCompletedMissSeqReceived(rtp_packet_h264); + } + } } } } else if (rtp::PAYLOAD_TYPE::H264 - 1 == rtp_packet_h264.PayloadType()) { @@ -366,78 +382,181 @@ void RtpVideoReceiver::ProcessAv1RtpPacket(RtpPacketAv1& rtp_packet_av1) { // } } -bool RtpVideoReceiver::CheckIsH264FrameCompleted( +bool RtpVideoReceiver::CheckIsH264FrameCompletedFuaEndReceived( RtpPacketH264& rtp_packet_h264) { - if (rtp_packet_h264.FuAEnd()) { - uint16_t end_seq = rtp_packet_h264.SequenceNumber(); - while (end_seq--) { - auto it = incomplete_h264_frame_list_.find(end_seq); - if (it == incomplete_h264_frame_list_.end()) { - if (padding_sequence_numbers_.find(end_seq) == - padding_sequence_numbers_.end()) { - return false; - } else { - continue; - } - } else if (!it->second.FuAStart()) { - continue; - } else if (it->second.FuAStart()) { - if (!nv12_data_) { - nv12_data_ = new uint8_t[NV12_BUFFER_SIZE]; - } + uint64_t timestamp = rtp_packet_h264.Timestamp(); + uint16_t end_seq = rtp_packet_h264.SequenceNumber(); + fua_end_sequence_numbers_[timestamp] = end_seq; + uint16_t start_seq = 0; + bool has_start = false; + bool has_missing = false; + missing_sequence_numbers_wait_time_[timestamp] = clock_->CurrentTime().ms(); - size_t complete_frame_size = 0; - int frame_fragment_count = 0; - uint16_t start = it->first; - uint16_t end = rtp_packet_h264.SequenceNumber(); - for (uint16_t seq = start; seq <= end; seq++) { - if (padding_sequence_numbers_.find(seq) == - padding_sequence_numbers_.end()) { - complete_frame_size += - incomplete_h264_frame_list_[seq].PayloadSize(); - } else { - padding_sequence_numbers_.erase(seq); - } - } + for (uint16_t seq = end_seq; seq > 0; --seq) { + auto it = incomplete_h264_frame_list_.find(seq); + if (it == incomplete_h264_frame_list_.end()) { + if (padding_sequence_numbers_.find(seq) == + padding_sequence_numbers_.end()) { + missing_sequence_numbers_[timestamp].insert(seq); + LOG_WARN("missing {}", seq); + } + } else if (it->second.FuAStart()) { + start_seq = seq; + has_start = true; + break; + } + } - if (!nv12_data_) { - nv12_data_ = new uint8_t[NV12_BUFFER_SIZE]; - } else if (complete_frame_size > NV12_BUFFER_SIZE) { - delete[] nv12_data_; - nv12_data_ = new uint8_t[complete_frame_size]; - } + if (!has_start) { + return false; + } - uint8_t* dest = nv12_data_; - for (uint16_t seq = start; seq <= end; seq++) { - size_t payload_size = incomplete_h264_frame_list_[seq].PayloadSize(); - if (payload_size) { - memcpy(dest, incomplete_h264_frame_list_[seq].Payload(), - payload_size); - } - dest += payload_size; - incomplete_h264_frame_list_.erase(seq); - frame_fragment_count++; - } + if (missing_sequence_numbers_.find(timestamp) != + missing_sequence_numbers_.end()) { + if (!missing_sequence_numbers_[timestamp].empty()) { + return false; + } + } - ReceivedFrame received_frame(nv12_data_, complete_frame_size); - received_frame.SetReceivedTimestamp(clock_->CurrentTime().us()); - received_frame.SetCapturedTimestamp( - (static_cast(rtp_packet_h264.Timestamp()) / - rtp::kMsToRtpTimestamp - - delta_ntp_internal_ms_) * - 1000); - compelete_video_frame_queue_.push(received_frame); + size_t complete_frame_size = 0; + int frame_fragment_count = 0; - return true; - } else { - LOG_WARN("What happened?"); + for (uint16_t seq = start_seq; seq <= end_seq; ++seq) { + if (padding_sequence_numbers_.find(seq) != + padding_sequence_numbers_.end()) { + padding_sequence_numbers_.erase(seq); + continue; + } + if (incomplete_h264_frame_list_.find(seq) != + incomplete_h264_frame_list_.end()) { + complete_frame_size += incomplete_h264_frame_list_[seq].PayloadSize(); + } + } + + if (!nv12_data_) { + nv12_data_ = new uint8_t[NV12_BUFFER_SIZE]; + } else if (complete_frame_size > NV12_BUFFER_SIZE) { + delete[] nv12_data_; + nv12_data_ = new uint8_t[complete_frame_size]; + } + + uint8_t* dest = nv12_data_; + for (uint16_t seq = start_seq; seq <= end_seq; ++seq) { + if (incomplete_h264_frame_list_.find(seq) != + incomplete_h264_frame_list_.end()) { + size_t payload_size = incomplete_h264_frame_list_[seq].PayloadSize(); + memcpy(dest, incomplete_h264_frame_list_[seq].Payload(), payload_size); + dest += payload_size; + incomplete_h264_frame_list_.erase(seq); + frame_fragment_count++; + } + } + + ReceivedFrame received_frame(nv12_data_, complete_frame_size); + received_frame.SetReceivedTimestamp(clock_->CurrentTime().us()); + received_frame.SetCapturedTimestamp( + (static_cast(timestamp) / rtp::kMsToRtpTimestamp - + delta_ntp_internal_ms_) * + 1000); + + fua_end_sequence_numbers_.erase(timestamp); + missing_sequence_numbers_wait_time_.erase(timestamp); + missing_sequence_numbers_.erase(timestamp); + compelete_video_frame_queue_.push(received_frame); + + return true; +} + +bool RtpVideoReceiver::CheckIsH264FrameCompletedMissSeqReceived( + RtpPacketH264& rtp_packet_h264) { + if (fua_end_sequence_numbers_.find(rtp_packet_h264.Timestamp()) == + fua_end_sequence_numbers_.end()) { + return false; + } + + uint64_t timestamp = rtp_packet_h264.Timestamp(); + uint16_t end_seq = fua_end_sequence_numbers_[timestamp]; + uint16_t start_seq = 0; + bool has_start = false; + bool has_missing = false; + + for (uint16_t seq = end_seq; seq > 0; --seq) { + auto it = incomplete_h264_frame_list_.find(seq); + if (it == incomplete_h264_frame_list_.end()) { + if (padding_sequence_numbers_.find(seq) == + padding_sequence_numbers_.end()) { + return false; + } + } else if (it->second.FuAStart()) { + start_seq = seq; + has_start = true; + break; + } + } + + if (!has_start) { + return false; + } + + if (missing_sequence_numbers_.find(timestamp) != + missing_sequence_numbers_.end() && + missing_sequence_numbers_wait_time_.find(timestamp) != + missing_sequence_numbers_wait_time_.end()) { + if (!missing_sequence_numbers_[timestamp].empty()) { + int64_t wait_time = clock_->CurrentTime().us() - + missing_sequence_numbers_wait_time_[timestamp]; + if (wait_time < MAX_WAIT_TIME_MS) { return false; } } - - return true; } - return false; + + size_t complete_frame_size = 0; + int frame_fragment_count = 0; + + for (uint16_t seq = start_seq; seq <= end_seq; ++seq) { + if (padding_sequence_numbers_.find(seq) != + padding_sequence_numbers_.end()) { + padding_sequence_numbers_.erase(seq); + continue; + } + if (incomplete_h264_frame_list_.find(seq) != + incomplete_h264_frame_list_.end()) { + complete_frame_size += incomplete_h264_frame_list_[seq].PayloadSize(); + } + } + + if (!nv12_data_) { + nv12_data_ = new uint8_t[NV12_BUFFER_SIZE]; + } else if (complete_frame_size > NV12_BUFFER_SIZE) { + delete[] nv12_data_; + nv12_data_ = new uint8_t[complete_frame_size]; + } + + uint8_t* dest = nv12_data_; + for (uint16_t seq = start_seq; seq <= end_seq; ++seq) { + if (incomplete_h264_frame_list_.find(seq) != + incomplete_h264_frame_list_.end()) { + size_t payload_size = incomplete_h264_frame_list_[seq].PayloadSize(); + memcpy(dest, incomplete_h264_frame_list_[seq].Payload(), payload_size); + dest += payload_size; + incomplete_h264_frame_list_.erase(seq); + frame_fragment_count++; + } + } + + ReceivedFrame received_frame(nv12_data_, complete_frame_size); + received_frame.SetReceivedTimestamp(clock_->CurrentTime().us()); + received_frame.SetCapturedTimestamp( + (static_cast(timestamp) / rtp::kMsToRtpTimestamp - + delta_ntp_internal_ms_) * + 1000); + + missing_sequence_numbers_.erase(timestamp); + missing_sequence_numbers_wait_time_.erase(timestamp); + compelete_video_frame_queue_.push(received_frame); + + return true; } bool RtpVideoReceiver::CheckIsAv1FrameCompleted(RtpPacketAv1& rtp_packet_av1) { diff --git a/src/transport/channel/rtp_video_receiver.h b/src/transport/channel/rtp_video_receiver.h index 394fdc5..ff2c954 100644 --- a/src/transport/channel/rtp_video_receiver.h +++ b/src/transport/channel/rtp_video_receiver.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "api/clock/clock.h" @@ -55,7 +56,8 @@ class RtpVideoReceiver : public ThreadBase, private: void ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264); - bool CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264); + bool CheckIsH264FrameCompletedFuaEndReceived(RtpPacketH264& rtp_packet_h264); + bool CheckIsH264FrameCompletedMissSeqReceived(RtpPacketH264& rtp_packet_h264); private: bool CheckIsTimeSendRR(); @@ -113,7 +115,11 @@ class RtpVideoReceiver : public ThreadBase, // std::map> fec_repair_symbol_list_; std::set incomplete_fec_frame_list_; std::map> incomplete_fec_packet_list_; - std::unordered_set padding_sequence_numbers_; + std::unordered_set padding_sequence_numbers_; + std::unordered_map> + missing_sequence_numbers_; + std::unordered_map fua_end_sequence_numbers_; + std::unordered_map missing_sequence_numbers_wait_time_; private: std::thread rtcp_thread_; diff --git a/src/transport/channel/rtp_video_sender.cpp b/src/transport/channel/rtp_video_sender.cpp index f5c26f0..a766846 100644 --- a/src/transport/channel/rtp_video_sender.cpp +++ b/src/transport/channel/rtp_video_sender.cpp @@ -16,7 +16,6 @@ RtpVideoSender::RtpVideoSender(std::shared_ptr clock, std::shared_ptr io_statistics) : ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics), - rtp_packet_history_(std::make_unique(clock_)), clock_(webrtc::Clock::GetWebrtcClockShared(clock)) { SetPeriod(std::chrono::milliseconds(5)); #ifdef SAVE_RTP_SENT_STREAM @@ -130,8 +129,6 @@ int RtpVideoSender::SendRtpPacket( if (on_sent_packet_func_) { on_sent_packet_func_(*rtp_packet_to_send); - rtp_packet_history_->AddPacket(std::move(rtp_packet_to_send), - clock_->CurrentTime()); } return 0; diff --git a/src/transport/channel/rtp_video_sender.h b/src/transport/channel/rtp_video_sender.h index e043ce0..847140c 100644 --- a/src/transport/channel/rtp_video_sender.h +++ b/src/transport/channel/rtp_video_sender.h @@ -9,7 +9,6 @@ #include "receiver_report.h" #include "ringbuffer.h" #include "rtp_packet.h" -#include "rtp_packet_history.h" #include "rtp_packet_to_send.h" #include "rtp_statistics.h" #include "sender_report.h" @@ -62,7 +61,6 @@ class RtpVideoSender : public ThreadBase { std::shared_ptr clock_ = nullptr; std::unique_ptr rtp_statistics_ = nullptr; std::shared_ptr io_statistics_ = nullptr; - std::unique_ptr rtp_packet_history_ = nullptr; uint32_t last_send_bytes_ = 0; uint32_t last_send_rtcp_sr_packet_ts_ = 0; uint32_t total_rtp_payload_sent_ = 0; diff --git a/src/transport/channel/video_channel_send.cpp b/src/transport/channel/video_channel_send.cpp index 80ecffc..ef9cd5e 100644 --- a/src/transport/channel/video_channel_send.cpp +++ b/src/transport/channel/video_channel_send.cpp @@ -3,10 +3,6 @@ #include "log.h" #include "rtc_base/network/sent_packet.h" -VideoChannelSend::VideoChannelSend() {} - -VideoChannelSend::~VideoChannelSend() {} - VideoChannelSend::VideoChannelSend( std::shared_ptr clock, std::shared_ptr ice_agent, std::shared_ptr packet_sender, @@ -19,8 +15,11 @@ VideoChannelSend::VideoChannelSend( on_sent_packet_func_(on_sent_packet_func), delta_ntp_internal_ms_(clock->CurrentNtpInMilliseconds() - clock->CurrentTimeMs()), + rtp_packet_history_(clock), clock_(clock){}; +VideoChannelSend::~VideoChannelSend() {} + void VideoChannelSend::Initialize(rtp::PAYLOAD_TYPE payload_type) { rtp_video_sender_ = std::make_unique(clock_, ice_io_statistics_); @@ -58,6 +57,39 @@ void VideoChannelSend::SetEnqueuePacketsFunc( rtp_video_sender_->SetEnqueuePacketsFunc(enqueue_packets_func); } +void VideoChannelSend::OnSentRtpPacket( + std::unique_ptr packet) { + if (packet->retransmitted_sequence_number()) { + rtp_packet_history_.MarkPacketAsSent( + *packet->retransmitted_sequence_number()); + } else if (packet->PayloadType() != rtp::PAYLOAD_TYPE::H264 - 1) { + rtp_packet_history_.PutRtpPacket(std::move(packet), clock_->CurrentTime()); + } +} + +void VideoChannelSend::OnReceiveNack( + const std::vector& nack_sequence_numbers) { + // int64_t rtt = rtt_ms(); + // if (rtt == 0) { + // if (std::optional average_rtt = + // rtcp_receiver_.AverageRtt()) { + // rtt = average_rtt->ms(); + // } + // } + + int64_t avg_rtt = 10; + rtp_packet_history_.SetRtt(TimeDelta::Millis(5 + avg_rtt)); + for (uint16_t seq_no : nack_sequence_numbers) { + const int32_t bytes_sent = ReSendPacket(seq_no); + if (bytes_sent < 0) { + // Failed to send one Sequence number. Give up the rest in this nack. + LOG_WARN("Failed resending RTP packet {}, Discard rest of packets", + seq_no); + break; + } + } +} + std::vector> VideoChannelSend::GeneratePadding( uint32_t payload_size, int64_t captured_timestamp_us) { if (rtp_packetizer_) { @@ -87,3 +119,46 @@ int VideoChannelSend::SendVideo(std::shared_ptr encoded_frame) { return 0; } + +int32_t VideoChannelSend::ReSendPacket(uint16_t packet_id) { + int32_t packet_size = 0; + + std::unique_ptr packet = + rtp_packet_history_.GetPacketAndMarkAsPending( + packet_id, [&](const webrtc::RtpPacketToSend& stored_packet) { + // Check if we're overusing retransmission bitrate. + // TODO(sprang): Add histograms for nack success or failure + // reasons. + packet_size = stored_packet.size(); + std::unique_ptr retransmit_packet; + + retransmit_packet = + std::make_unique(stored_packet); + + if (retransmit_packet) { + retransmit_packet->set_retransmitted_sequence_number( + stored_packet.SequenceNumber()); + retransmit_packet->set_original_ssrc(stored_packet.Ssrc()); + } + return retransmit_packet; + }); + if (packet_size == 0) { + // Packet not found or already queued for retransmission, ignore. + return 0; + } + if (!packet) { + // Packet was found, but lambda helper above chose not to create + // `retransmit_packet` out of it. + LOG_WARN("packet not found"); + return -1; + } + + packet->set_packet_type(webrtc::RtpPacketMediaType::kRetransmission); + packet->set_fec_protect_packet(false); + std::vector> packets; + packets.emplace_back(std::move(packet)); + + packet_sender_->EnqueueRtpPacket(std::move(packets)); + + return packet_size; +} \ No newline at end of file diff --git a/src/transport/channel/video_channel_send.h b/src/transport/channel/video_channel_send.h index 5f035d3..d0f69dd 100644 --- a/src/transport/channel/video_channel_send.h +++ b/src/transport/channel/video_channel_send.h @@ -15,13 +15,13 @@ #include "encoded_frame.h" #include "ice_agent.h" #include "packet_sender.h" +#include "rtp_packet_history.h" #include "rtp_packetizer.h" #include "rtp_video_sender.h" #include "transport_feedback_adapter.h" class VideoChannelSend { public: - VideoChannelSend(); VideoChannelSend(std::shared_ptr clock, std::shared_ptr ice_agent, std::shared_ptr packet_sender, @@ -35,15 +35,13 @@ class VideoChannelSend { void(std::vector>&)> enqueue_packets_func); + void OnSentRtpPacket(std::unique_ptr packet); + + void OnReceiveNack(const std::vector& nack_sequence_numbers); + std::vector> GeneratePadding( uint32_t payload_size, int64_t captured_timestamp_us); - int64_t GetTransportSeqAndIncrement() { - int64_t transport_seq = rtp_video_sender_->GetTransportSequenceNumber(); - rtp_video_sender_->IncrementTransportSequenceNumber(); - return transport_seq; - } - public: void Initialize(rtp::PAYLOAD_TYPE payload_type); void Destroy(); @@ -57,10 +55,6 @@ class VideoChannelSend { int SendVideo(std::shared_ptr encoded_frame); - void OnCongestionControlFeedback( - Timestamp recv_ts, - const webrtc::rtcp::CongestionControlFeedback& feedback); - void OnReceiverReport(const ReceiverReport& receiver_report) { if (rtp_video_sender_) { rtp_video_sender_->OnReceiverReport(receiver_report); @@ -68,9 +62,7 @@ class VideoChannelSend { } private: - void PostUpdates(webrtc::NetworkControlUpdate update); - void UpdateControlState(); - void UpdateCongestedState(); + int32_t ReSendPacket(uint16_t packet_id); private: std::shared_ptr packet_sender_ = nullptr; @@ -84,6 +76,7 @@ class VideoChannelSend { private: std::shared_ptr clock_; + RtpPacketHistory rtp_packet_history_; int64_t delta_ntp_internal_ms_; }; diff --git a/src/transport/ice_transport.cpp b/src/transport/ice_transport.cpp index 18ac745..b40102d 100644 --- a/src/transport/ice_transport.cpp +++ b/src/transport/ice_transport.cpp @@ -399,20 +399,12 @@ bool IceTransport::HandleNack(const RtcpCommonHeader &rtcp_block, return false; } - // uint32_t first_media_source_ssrc = nack.ssrc(); - // if (first_media_source_ssrc == local_media_ssrc() || - // registered_ssrcs_.contains(first_media_source_ssrc)) { - // rtcp_packet_info->nack.emplace(std::move(nack)); - // } + if (ice_transport_controller_) { + ice_transport_controller_->OnReceiveNack(nack.packet_ids()); + return true; + } - // int64_t rtt = rtt_ms(); - // if (rtt == 0) { - // if (std::optional average_rtt = rtcp_receiver_.AverageRtt()) { - // rtt = average_rtt->ms(); - // } - // } - - return true; + return false; } bool IceTransport::HandleFir(const RtcpCommonHeader &rtcp_block, diff --git a/src/transport/ice_transport_controller.cpp b/src/transport/ice_transport_controller.cpp index e0fe46d..c2cc7dc 100644 --- a/src/transport/ice_transport_controller.cpp +++ b/src/transport/ice_transport_controller.cpp @@ -10,16 +10,16 @@ IceTransportController::IceTransportController( std::shared_ptr clock) - : last_report_block_time_( + : clock_(clock), + webrtc_clock_(webrtc::Clock::GetWebrtcClockShared(clock)), + last_report_block_time_( webrtc::Timestamp::Millis(webrtc_clock_->TimeInMilliseconds())), b_force_i_frame_(true), video_codec_inited_(false), audio_codec_inited_(false), load_nvcodec_dll_success_(false), hardware_acceleration_(false), - congestion_window_size_(DataSize::PlusInfinity()), - clock_(clock), - webrtc_clock_(webrtc::Clock::GetWebrtcClockShared(clock)) { + congestion_window_size_(DataSize::PlusInfinity()) { SetPeriod(std::chrono::milliseconds(25)); } @@ -60,10 +60,25 @@ void IceTransportController::Create( packet_sender_->SetSendBurstInterval(TimeDelta::Millis(40)); packet_sender_->SetQueueTimeLimit(TimeDelta::Millis(2000)); packet_sender_->SetOnSentPacketFunc( - [this](const webrtc::RtpPacketToSend& packet) { + [this](std::unique_ptr packet) { if (ice_agent_) { - ice_agent_->Send((const char*)packet.Buffer().data(), packet.Size()); - OnSentRtpPacket(packet); + webrtc::Timestamp now = webrtc_clock_->CurrentTime(); + ice_agent_->Send((const char*)packet->Buffer().data(), + packet->Size()); + OnSentRtpPacket(*packet); + + if (packet->packet_type().has_value()) { + switch (packet->packet_type().value()) { + case webrtc::RtpPacketMediaType::kVideo: + case webrtc::RtpPacketMediaType::kRetransmission: + if (video_channel_send_) { + video_channel_send_->OnSentRtpPacket(std::move(packet)); + } + break; + default: + break; + } + } } }); @@ -448,12 +463,13 @@ void IceTransportController::OnReceiverReport( report_block.ExtendedHighSeqNum(); last_loss_report.cumulative_lost = report_block.CumulativeLost(); } - // Can only compute delta if there has been previous blocks to compare to. If - // not, total_packets_delta will be unchanged and there's nothing more to do. + // Can only compute delta if there has been previous blocks to compare to. + // If not, total_packets_delta will be unchanged and there's nothing more to + // do. if (!total_packets_delta) return; int packets_received_delta = total_packets_delta - total_packets_lost_delta; - // To detect lost packets, at least one packet has to be received. This check - // is needed to avoid bandwith detection update in + // To detect lost packets, at least one packet has to be received. This + // check is needed to avoid bandwith detection update in // VideoSendStreamTest.SuspendBelowMinBitrate if (packets_received_delta < 1) { @@ -489,6 +505,13 @@ void IceTransportController::HandleTransportPacketsFeedback( UpdateCongestedState(); } +void IceTransportController::OnReceiveNack( + const std::vector& nack_sequence_numbers) { + if (video_channel_send_) { + video_channel_send_->OnReceiveNack(nack_sequence_numbers); + } +} + void IceTransportController::UpdateControllerWithTimeInterval() { ProcessInterval msg; msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); diff --git a/src/transport/ice_transport_controller.h b/src/transport/ice_transport_controller.h index ed18011..a482f22 100644 --- a/src/transport/ice_transport_controller.h +++ b/src/transport/ice_transport_controller.h @@ -74,6 +74,7 @@ class IceTransportController void OnReceiverReport(const std::vector &report_block_datas); void OnCongestionControlFeedback( const webrtc::rtcp::CongestionControlFeedback &feedback); + void OnReceiveNack(const std::vector &nack_sequence_numbers); private: int CreateVideoCodec(std::shared_ptr clock, diff --git a/src/transport/packet_sender/packet_sender.h b/src/transport/packet_sender/packet_sender.h index d5f4442..a780971 100644 --- a/src/transport/packet_sender/packet_sender.h +++ b/src/transport/packet_sender/packet_sender.h @@ -11,6 +11,7 @@ #include #include "rtp_packet.h" +#include "rtp_packet_to_send.h" class PacketSender { public: @@ -19,8 +20,12 @@ class PacketSender { virtual int Send() = 0; virtual int EnqueueRtpPacket( - std::vector> &rtp_packets, + std::vector>& rtp_packets, int64_t captured_timestamp_us) = 0; + + virtual int EnqueueRtpPacket( + std::vector>& rtp_packets) = 0; + ; }; #endif \ No newline at end of file diff --git a/src/transport/packet_sender/packet_sender_imp.cpp b/src/transport/packet_sender/packet_sender_imp.cpp index ae42ba3..e376b9f 100644 --- a/src/transport/packet_sender/packet_sender_imp.cpp +++ b/src/transport/packet_sender/packet_sender_imp.cpp @@ -290,4 +290,10 @@ int PacketSenderImp::EnqueueRtpPacket( EnqueuePackets(std::move(to_send_rtp_packets)); return 0; +} + +int PacketSenderImp::EnqueueRtpPacket( + std::vector> &rtp_packets) { + EnqueuePackets(std::move(rtp_packets)); + return 0; } \ No newline at end of file diff --git a/src/transport/packet_sender/packet_sender_imp.h b/src/transport/packet_sender/packet_sender_imp.h index 05564fd..a6d6bf6 100644 --- a/src/transport/packet_sender/packet_sender_imp.h +++ b/src/transport/packet_sender/packet_sender_imp.h @@ -35,13 +35,18 @@ class PacketSenderImp : public PacketSender, ~PacketSenderImp(); public: - int Send() { return 0; } + int Send() override { return 0; } int EnqueueRtpPacket(std::vector>& rtp_packets, - int64_t captured_timestamp_us); + int64_t captured_timestamp_us) override; + int EnqueueRtpPacket(std::vector>& + rtp_packets) override; + + public: void SetOnSentPacketFunc( - std::function on_sent_packet_func) { + std::function)> + on_sent_packet_func) { on_sent_packet_func_ = on_sent_packet_func; } @@ -59,8 +64,12 @@ class PacketSenderImp : public PacketSender, ssrc_seq_[packet->Ssrc()] = 1; } - packet->UpdateSequenceNumber(ssrc_seq_[packet->Ssrc()]++); - on_sent_packet_func_(*packet); + if (packet->packet_type() != + webrtc::RtpPacketMediaType::kRetransmission) { + packet->UpdateSequenceNumber(ssrc_seq_[packet->Ssrc()]++); + } + + on_sent_packet_func_(std::move(packet)); } } // Should be called after each call to SendPacket(). @@ -176,8 +185,8 @@ class PacketSenderImp : public PacketSender, private: std::shared_ptr ice_agent_ = nullptr; webrtc::PacingController pacing_controller_; - std::function on_sent_packet_func_ = - nullptr; + std::function)> + on_sent_packet_func_ = nullptr; std::function>(uint32_t, int64_t)> generat_padding_func_ = nullptr;