From 2cb92ddd72092e443d3197e6a978f731d09d4d1d Mon Sep 17 00:00:00 2001 From: dijunkun Date: Fri, 6 Dec 2024 17:26:52 +0800 Subject: [PATCH] [feat] add rtcp thread in rtp video receiver --- src/rtcp/rtcp_tcc.cpp | 30 ++++++++ src/rtcp/rtcp_tcc.h | 77 +++++++++++++++++++++ src/rtp/rtp_endpoint/rtp_video_receiver.cpp | 75 ++++++++++++++------ src/rtp/rtp_endpoint/rtp_video_receiver.h | 11 +++ src/statistics/io_statistics.cpp | 28 ++++---- 5 files changed, 183 insertions(+), 38 deletions(-) create mode 100644 src/rtcp/rtcp_tcc.cpp create mode 100644 src/rtcp/rtcp_tcc.h diff --git a/src/rtcp/rtcp_tcc.cpp b/src/rtcp/rtcp_tcc.cpp new file mode 100644 index 0000000..03ae599 --- /dev/null +++ b/src/rtcp/rtcp_tcc.cpp @@ -0,0 +1,30 @@ +#include "rtcp_tcc.h" + +RtcpTcc::RtcpTcc() { + buffer_ = new uint8_t[DEFAULT_RR_SIZE]; + size_ = DEFAULT_RR_SIZE; +} + +RtcpTcc::~RtcpTcc() { + if (buffer_) { + delete buffer_; + buffer_ = nullptr; + } + + size_ = 0; +} + +void RtcpTcc::SetReportBlock(RtcpReportBlock &rtcp_report_block) { + reports_.push_back(rtcp_report_block); +} + +void RtcpTcc::SetReportBlock(std::vector &rtcp_report_blocks) { + reports_ = rtcp_report_blocks; +} + +const uint8_t *RtcpTcc::Encode() { + rtcp_header_.Encode(DEFAULT_RTCP_VERSION, 0, DEFAULT_RR_BLOCK_NUM, + RTCP_TYPE::RR, DEFAULT_RR_SIZE, buffer_); + + return buffer_; +} \ No newline at end of file diff --git a/src/rtcp/rtcp_tcc.h b/src/rtcp/rtcp_tcc.h new file mode 100644 index 0000000..268613d --- /dev/null +++ b/src/rtcp/rtcp_tcc.h @@ -0,0 +1,77 @@ +/* + * @Author: DI JUNKUN + * @Date: 2024-12-06 + * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _RTCP_TCC_H_ +#define _RTCP_TCC_H_ + +// RR +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |V=2|P| FMT=15 | PT=205 | length | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | SSRC of packet sender | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | SSRC of media source | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | base sequence number | packet status count | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | reference time | fb pkt. count | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | packet chunk | packet chunk | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// . . +// . . +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | packet chunk | recv delta | recv delta | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// . . +// . . +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | recv delta | recv delta | zero padding | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +// RTP transport sequence number +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | 0xBE | 0xDE | length=1 | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | ID | L=1 |transport-wide sequence number | zero padding | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +#include + +#include "rtcp_header.h" +#include "rtcp_typedef.h" + +class RtcpTcc { + public: + RtcpTcc(); + ~RtcpTcc(); + + public: + void SetReportBlock(RtcpReportBlock &rtcp_report_block); + void SetReportBlock(std::vector &rtcp_report_blocks); + + public: + const uint8_t *Encode(); + size_t Decode(); + + // Entire RTP buffer + const uint8_t *Buffer() const { return buffer_; } + size_t Size() const { return size_; } + + private: + RtcpHeader rtcp_header_; + std::vector reports_; + + // Entire RTCP buffer + uint8_t *buffer_ = nullptr; + size_t size_ = 0; +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_endpoint/rtp_video_receiver.cpp b/src/rtp/rtp_endpoint/rtp_video_receiver.cpp index ce2cf4c..f2a2232 100644 --- a/src/rtp/rtp_endpoint/rtp_video_receiver.cpp +++ b/src/rtp/rtp_endpoint/rtp_video_receiver.cpp @@ -8,12 +8,21 @@ RtpVideoReceiver::RtpVideoReceiver() {} RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr io_statistics) - : io_statistics_(io_statistics) {} + : io_statistics_(io_statistics) { + rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); +} RtpVideoReceiver::~RtpVideoReceiver() { if (rtp_statistics_) { rtp_statistics_->Stop(); } + + rtcp_stop_.store(true); + rtcp_cv_.notify_one(); + + if (rtcp_thread_.joinable()) { + rtcp_thread_.join(); + } } void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { @@ -299,27 +308,6 @@ bool RtpVideoReceiver::CheckIsAv1FrameCompleted(RtpPacket& rtp_packet) { return false; } -bool RtpVideoReceiver::Process() { - if (!compelete_video_frame_queue_.isEmpty()) { - VideoFrame video_frame; - compelete_video_frame_queue_.pop(video_frame); - if (on_receive_complete_frame_) { - // auto now_complete_frame_ts = - // std::chrono::duration_cast( - // std::chrono::system_clock::now().time_since_epoch()) - // .count(); - // uint32_t duration = now_complete_frame_ts - last_complete_frame_ts_; - // LOG_ERROR("Duration {}", duration); - // last_complete_frame_ts_ = now_complete_frame_ts; - - on_receive_complete_frame_(video_frame); - } - } - - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - return true; -} - void RtpVideoReceiver::SetSendDataFunc( std::function data_send_func) { data_send_func_ = data_send_func; @@ -351,4 +339,47 @@ bool RtpVideoReceiver::CheckIsTimeSendRR() { } else { return false; } +} + +bool RtpVideoReceiver::Process() { + if (!compelete_video_frame_queue_.isEmpty()) { + VideoFrame video_frame; + compelete_video_frame_queue_.pop(video_frame); + if (on_receive_complete_frame_) { + // auto now_complete_frame_ts = + // std::chrono::duration_cast( + // std::chrono::system_clock::now().time_since_epoch()) + // .count(); + // uint32_t duration = now_complete_frame_ts - last_complete_frame_ts_; + // LOG_ERROR("Duration {}", duration); + // last_complete_frame_ts_ = now_complete_frame_ts; + + on_receive_complete_frame_(video_frame); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + return true; +} + +void RtpVideoReceiver::RtcpThread() { + while (!rtcp_stop_) { + std::unique_lock lock(rtcp_mtx_); + if (rtcp_cv_.wait_for( + lock, std::chrono::milliseconds(rtcp_tcc_interval_ms_), + [&]() { return send_rtcp_rr_triggered_ || rtcp_stop_; })) { + if (rtcp_stop_) break; + send_rtcp_rr_triggered_ = false; + } else { + LOG_ERROR("Send video tcc"); + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast( + now - last_send_rtcp_rr_ts_) + .count(); + if (elapsed >= rtcp_rr_interval_ms_) { + LOG_ERROR("Send video rr [{}]", (void*)this); + last_send_rtcp_rr_ts_ = now; + } + } + } } \ No newline at end of file diff --git a/src/rtp/rtp_endpoint/rtp_video_receiver.h b/src/rtp/rtp_endpoint/rtp_video_receiver.h index e1bb7a3..5b94afb 100644 --- a/src/rtp/rtp_endpoint/rtp_video_receiver.h +++ b/src/rtp/rtp_endpoint/rtp_video_receiver.h @@ -45,6 +45,7 @@ class RtpVideoReceiver : public ThreadBase { private: bool Process() override; + void RtcpThread(); private: std::map incomplete_frame_list_; @@ -72,6 +73,16 @@ class RtpVideoReceiver : public ThreadBase { // std::map> fec_repair_symbol_list_; std::set incomplete_fec_frame_list_; std::map> incomplete_fec_packet_list_; + + private: + std::thread rtcp_thread_; + std::mutex rtcp_mtx_; + std::condition_variable rtcp_cv_; + std::chrono::steady_clock::time_point last_send_rtcp_rr_ts_; + std::atomic send_rtcp_rr_triggered_ = false; + std::atomic rtcp_stop_ = false; + int rtcp_rr_interval_ms_ = 5000; + int rtcp_tcc_interval_ms_ = 200; }; #endif diff --git a/src/statistics/io_statistics.cpp b/src/statistics/io_statistics.cpp index 263d76f..3521afb 100644 --- a/src/statistics/io_statistics.cpp +++ b/src/statistics/io_statistics.cpp @@ -155,11 +155,11 @@ void IOStatistics::Stop() { } void IOStatistics::UpdateVideoInboundBytes(uint32_t bytes) { - video_inbound_bytes_.fetch_add(bytes, std::memory_order_relaxed); + video_inbound_bytes_ += bytes; } void IOStatistics::UpdateVideoOutboundBytes(uint32_t bytes) { - video_outbound_bytes_.fetch_add(bytes, std::memory_order_relaxed); + video_outbound_bytes_ += bytes; } void IOStatistics::UpdateVideoPacketLossCount(uint16_t seq_num) { @@ -167,13 +167,11 @@ void IOStatistics::UpdateVideoPacketLossCount(uint16_t seq_num) { if (last_v_seq != 0) { if (seq_num > last_v_seq) { if (seq_num - last_v_seq != 1) { - video_rtp_pkt_loss_cnt_.fetch_add(seq_num - last_v_seq - 1, - std::memory_order_relaxed); + video_rtp_pkt_loss_cnt_ += seq_num - last_v_seq - 1; } } else { - video_rtp_pkt_loss_cnt_.fetch_add( - seq_num + (std::numeric_limits::max() - last_v_seq) - 1, - std::memory_order_relaxed); + video_rtp_pkt_loss_cnt_ += + seq_num + (std::numeric_limits::max() - last_v_seq) - 1; } } @@ -181,11 +179,11 @@ void IOStatistics::UpdateVideoPacketLossCount(uint16_t seq_num) { } void IOStatistics::UpdateAudioInboundBytes(uint32_t bytes) { - audio_inbound_bytes_.fetch_add(bytes, std::memory_order_relaxed); + audio_inbound_bytes_ += bytes; } void IOStatistics::UpdateAudioOutboundBytes(uint32_t bytes) { - audio_outbound_bytes_.fetch_add(bytes, std::memory_order_relaxed); + audio_outbound_bytes_ += bytes; } void IOStatistics::UpdateAudioPacketLossCount(uint16_t seq_num) { @@ -193,13 +191,11 @@ void IOStatistics::UpdateAudioPacketLossCount(uint16_t seq_num) { if (last_a_seq != 0) { if (seq_num > last_a_seq) { if (seq_num - last_a_seq != 1) { - audio_rtp_pkt_loss_cnt_.fetch_add(seq_num - last_a_seq - 1, - std::memory_order_relaxed); + audio_rtp_pkt_loss_cnt_ += seq_num - last_a_seq - 1; } } else { - audio_rtp_pkt_loss_cnt_.fetch_add( - seq_num + (std::numeric_limits::max() - last_a_seq) - 1, - std::memory_order_relaxed); + audio_rtp_pkt_loss_cnt_ += + seq_num + (std::numeric_limits::max() - last_a_seq) - 1; } } @@ -207,11 +203,11 @@ void IOStatistics::UpdateAudioPacketLossCount(uint16_t seq_num) { } void IOStatistics::UpdateDataInboundBytes(uint32_t bytes) { - data_inbound_bytes_.fetch_add(bytes, std::memory_order_relaxed); + data_inbound_bytes_ += bytes; } void IOStatistics::UpdateDataOutboundBytes(uint32_t bytes) { - data_outbound_bytes_.fetch_add(bytes, std::memory_order_relaxed); + data_outbound_bytes_ += bytes; } void IOStatistics::UpdateDataPacketLossCount(uint16_t seq_num) {