From c69818bb1d2959c1b0d7616d9f92214071561bdc Mon Sep 17 00:00:00 2001 From: dijunkun Date: Wed, 2 Apr 2025 17:34:46 +0800 Subject: [PATCH] [feat] use pending_frames_ instead of compelete_video_frame_queue_ in h264 frame assember --- src/transport/channel/rtp_video_receiver.cpp | 172 ++++++++----------- src/transport/channel/rtp_video_receiver.h | 12 +- src/transport/channel/video_channel_send.cpp | 2 +- 3 files changed, 82 insertions(+), 104 deletions(-) diff --git a/src/transport/channel/rtp_video_receiver.cpp b/src/transport/channel/rtp_video_receiver.cpp index ca3daff..643f256 100644 --- a/src/transport/channel/rtp_video_receiver.cpp +++ b/src/transport/channel/rtp_video_receiver.cpp @@ -11,7 +11,7 @@ #define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2) #define RTCP_RR_INTERVAL 1000 -#define MAX_WAIT_TIME_MS 20 // 20ms +#define MAX_WAIT_TIME_MS 200 // 20ms #define NACK_UPDATE_INTERVAL 20 // 20ms RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock) @@ -271,15 +271,29 @@ void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) { if (!fec_enable_) { rtp::NAL_UNIT_TYPE nalu_type = rtp_packet_h264.NalUnitType(); if (rtp::NAL_UNIT_TYPE::NALU == nalu_type) { - ReceivedFrame received_frame(rtp_packet_h264.Payload(), - rtp_packet_h264.PayloadSize()); - received_frame.SetReceivedTimestamp(clock_->CurrentTime().us()); - received_frame.SetCapturedTimestamp( + // ReceivedFrame received_frame(rtp_packet_h264.Payload(), + // rtp_packet_h264.PayloadSize()); + // received_frame.SetReceivedTimestamp(clock_->CurrentTime().us()); + // received_frame.SetCapturedTimestamp( + // (static_cast(rtp_packet_h264.Timestamp()) / + // rtp::kMsToRtpTimestamp - + // delta_ntp_internal_ms_) * + // 1000); + // compelete_video_frame_queue_.push(received_frame); + + std::unique_ptr received_frame = + std::make_unique(rtp_packet_h264.Payload(), + rtp_packet_h264.PayloadSize()); + received_frame->SetReceivedTimestamp(clock_->CurrentTime().us()); + received_frame->SetCapturedTimestamp( (static_cast(rtp_packet_h264.Timestamp()) / rtp::kMsToRtpTimestamp - delta_ntp_internal_ms_) * 1000); - compelete_video_frame_queue_.push(received_frame); + + std::lock_guard lock(pending_frames_mtx_); + pending_frames_[rtp_packet_h264.Timestamp()] = { + std::move(received_frame), true, clock_->CurrentTime().ms()}; } else if (rtp::NAL_UNIT_TYPE::FU_A == nalu_type) { if (rtp::PAYLOAD_TYPE::H264 == rtp_packet_h264.PayloadType()) { incomplete_h264_frame_list_[rtp_packet_h264.SequenceNumber()] = @@ -473,7 +487,8 @@ bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264, if (clock_->CurrentTime().ms() - missing_seqs_wait_ts_iter->second > MAX_WAIT_TIME_MS) { missing_sequence_numbers_wait_time_.erase(missing_seqs_wait_ts_iter); - LOG_WARN("rtx packet seq {} is timeout", seq); + std::lock_guard lock(pending_frames_mtx_); + pending_frames_.erase(timestamp); return false; } } @@ -483,6 +498,8 @@ bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264, ++sequence_number) { if (incomplete_h264_frame_list_.find(sequence_number) == incomplete_h264_frame_list_.end()) { + std::lock_guard lock(pending_frames_mtx_); + pending_frames_[timestamp] = {nullptr, false, clock_->CurrentTime().ms()}; return false; } } @@ -490,79 +507,6 @@ bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264, return PopCompleteFrame(start_seq, end_seq, timestamp); } -bool RtpVideoReceiver::CheckIsH264FrameCompletedFuaEndReceived( - RtpPacketH264& rtp_packet_h264) { - uint32_t timestamp = rtp_packet_h264.Timestamp(); - uint16_t end_seq = rtp_packet_h264.SequenceNumber(); - fua_end_sequence_numbers_[timestamp] = end_seq; - uint16_t start_seq = 0; - bool has_start = false; - bool has_missing = false; - if (missing_sequence_numbers_wait_time_.find(timestamp) == - missing_sequence_numbers_wait_time_.end()) { - missing_sequence_numbers_wait_time_[timestamp] = clock_->CurrentTime().ms(); - } - - for (uint16_t seq = end_seq; seq > 0; --seq) { - auto it = incomplete_h264_frame_list_.find(seq); - if (it->second.FuAStart()) { - start_seq = seq; - has_start = true; - break; - } - } - - if (!has_start) { - return false; - } - - return PopCompleteFrame(start_seq, end_seq, timestamp); -} - -bool RtpVideoReceiver::CheckIsH264FrameCompletedMissSeqReceived( - RtpPacketH264& rtp_packet_h264) { - if (fua_end_sequence_numbers_.find(rtp_packet_h264.Timestamp()) == - fua_end_sequence_numbers_.end()) { - return false; - } - - uint32_t timestamp = rtp_packet_h264.Timestamp(); - uint16_t end_seq = fua_end_sequence_numbers_[timestamp]; - uint16_t start_seq = 0; - bool has_start = false; - bool has_missing = false; - - for (uint16_t seq = end_seq; seq > 0; --seq) { - auto it = incomplete_h264_frame_list_.find(seq); - if (it == incomplete_h264_frame_list_.end()) { - if (padding_sequence_numbers_.find(seq) == - padding_sequence_numbers_.end()) { - return false; - } - } else if (it->second.FuAStart()) { - start_seq = seq; - has_start = true; - break; - } - } - - if (!has_start) { - return false; - } - - if (missing_sequence_numbers_wait_time_.find(timestamp) != - missing_sequence_numbers_wait_time_.end()) { - int64_t wait_time = clock_->CurrentTime().us() - - missing_sequence_numbers_wait_time_[timestamp]; - if (wait_time < MAX_WAIT_TIME_MS) { - return false; - } - } - - LOG_WARN("complete frame"); - return PopCompleteFrame(start_seq, end_seq, timestamp); -} - bool RtpVideoReceiver::PopCompleteFrame(uint16_t start_seq, uint16_t end_seq, uint32_t timestamp) { size_t complete_frame_size = 0; @@ -599,9 +543,10 @@ bool RtpVideoReceiver::PopCompleteFrame(uint16_t start_seq, uint16_t end_seq, } } - ReceivedFrame received_frame(nv12_data_, complete_frame_size); - received_frame.SetReceivedTimestamp(clock_->CurrentTime().us()); - received_frame.SetCapturedTimestamp( + std::unique_ptr received_frame = + std::make_unique(nv12_data_, complete_frame_size); + received_frame->SetReceivedTimestamp(clock_->CurrentTime().us()); + received_frame->SetCapturedTimestamp( (static_cast(timestamp) / rtp::kMsToRtpTimestamp - delta_ntp_internal_ms_) * 1000); @@ -609,8 +554,11 @@ bool RtpVideoReceiver::PopCompleteFrame(uint16_t start_seq, uint16_t end_seq, fua_start_sequence_numbers_.erase(timestamp); fua_end_sequence_numbers_.erase(timestamp); missing_sequence_numbers_wait_time_.erase(timestamp); - compelete_video_frame_queue_.push(received_frame); + // compelete_video_frame_queue_.push(received_frame); + std::lock_guard lock(pending_frames_mtx_); + pending_frames_[timestamp] = {std::move(received_frame), true, + clock_->CurrentTime().ms()}; return true; } @@ -749,24 +697,46 @@ bool RtpVideoReceiver::Process() { return false; } - if (!compelete_video_frame_queue_.isEmpty()) { - std::optional video_frame = - compelete_video_frame_queue_.pop(); - if (on_receive_complete_frame_ && video_frame) { - // auto now_complete_frame_ts = - // std::chrono::duration_cast( - // std::chrono::system_clock::now().time_since_epoch()) - // .count(); - // uint32_t duration = now_complete_frame_ts - - // last_complete_frame_ts_; LOG_ERROR("Duration {}", duration); - // last_complete_frame_ts_ = now_complete_frame_ts; + // if (!compelete_video_frame_queue_.isEmpty()) { + // std::optional video_frame = + // compelete_video_frame_queue_.pop(); + // if (on_receive_complete_frame_ && video_frame) { + // // auto now_complete_frame_ts = + // // std::chrono::duration_cast( + // // std::chrono::system_clock::now().time_since_epoch()) + // // .count(); + // // uint32_t duration = now_complete_frame_ts - + // // last_complete_frame_ts_; LOG_ERROR("Duration {}", duration); + // // last_complete_frame_ts_ = now_complete_frame_ts; - on_receive_complete_frame_(*video_frame); - // #ifdef SAVE_RTP_RECV_STREAM - // fwrite((unsigned char*)video_frame.Buffer(), 1, - // video_frame.Size(), - // file_rtp_recv_); - // #endif + // on_receive_complete_frame_(*video_frame); + // // #ifdef SAVE_RTP_RECV_STREAM + // // fwrite((unsigned char*)video_frame.Buffer(), 1, + // // video_frame.Size(), + // // file_rtp_recv_); + // // #endif + // } + // } + + std::lock_guard lock(pending_frames_mtx_); + if (pending_frames_.empty()) { + return false; + } + + auto it = pending_frames_.begin(); + while (it != pending_frames_.end()) { + if (it->second.is_complete) { + if (on_receive_complete_frame_) { + on_receive_complete_frame_(*(it->second.frame)); + } + it = pending_frames_.erase(it); + } else { + if (clock_->CurrentTime().ms() - it->second.arrival_time > + MAX_WAIT_TIME_MS) { + it = pending_frames_.erase(it); + } else { + return false; + } } } diff --git a/src/transport/channel/rtp_video_receiver.h b/src/transport/channel/rtp_video_receiver.h index bbda3f3..588e5a9 100644 --- a/src/transport/channel/rtp_video_receiver.h +++ b/src/transport/channel/rtp_video_receiver.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -61,8 +62,6 @@ class RtpVideoReceiver : public ThreadBase, void ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264); bool CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264, bool is_start, bool is_end, bool is_rtx); - bool CheckIsH264FrameCompletedFuaEndReceived(RtpPacketH264& rtp_packet_h264); - bool CheckIsH264FrameCompletedMissSeqReceived(RtpPacketH264& rtp_packet_h264); bool PopCompleteFrame(uint16_t start_seq, uint16_t end_seq, uint32_t timestamp); @@ -104,6 +103,15 @@ class RtpVideoReceiver : public ThreadBase, uint32_t last_complete_frame_ts_ = 0; RingBuffer compelete_video_frame_queue_; + private: + struct PendingFrame { + std::unique_ptr frame; + bool is_complete = false; + int64_t arrival_time = 0; + }; + std::map pending_frames_; + std::mutex pending_frames_mtx_; + private: std::shared_ptr io_statistics_ = nullptr; uint32_t last_recv_bytes_ = 0; diff --git a/src/transport/channel/video_channel_send.cpp b/src/transport/channel/video_channel_send.cpp index 9c0a05a..f6cf4c8 100644 --- a/src/transport/channel/video_channel_send.cpp +++ b/src/transport/channel/video_channel_send.cpp @@ -90,7 +90,7 @@ void VideoChannelSend::Destroy() {} int VideoChannelSend::SendVideo(const EncodedFrame& encoded_frame) { if (rtp_packetizer_ && packet_sender_) { - int32_t rtp_timestamp = + uint32_t rtp_timestamp = delta_ntp_internal_ms_ + static_cast(encoded_frame.CapturedTimestamp() / 1000); std::vector> rtp_packets =