From 3accdf2192c162fb7ff9d2f3be1daaeed581cf85 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Thu, 6 Feb 2025 17:08:26 +0800 Subject: [PATCH] [fix] fix crash due to rtp receivers destroy --- src/channel/rtp_channel/rtp_audio_sender.cpp | 7 ++-- src/channel/rtp_channel/rtp_data_sender.cpp | 5 ++- .../rtp_channel/rtp_video_receiver.cpp | 22 ++++++----- src/channel/rtp_channel/rtp_video_sender.cpp | 2 +- src/channel/video_channel_receive.cpp | 6 ++- src/thread/thread_base.cpp | 39 ++++++++++++++----- src/thread/thread_base.h | 13 ++++++- src/transport/ice_transport.cpp | 24 ++++++++---- 8 files changed, 82 insertions(+), 36 deletions(-) diff --git a/src/channel/rtp_channel/rtp_audio_sender.cpp b/src/channel/rtp_channel/rtp_audio_sender.cpp index b9fc7e2..ff56fb5 100644 --- a/src/channel/rtp_channel/rtp_audio_sender.cpp +++ b/src/channel/rtp_channel/rtp_audio_sender.cpp @@ -6,10 +6,12 @@ #define RTCP_SR_INTERVAL 1000 -RtpAudioSender::RtpAudioSender() {} +RtpAudioSender::RtpAudioSender() { SetPeriod(std::chrono::milliseconds(5)); } RtpAudioSender::RtpAudioSender(std::shared_ptr io_statistics) - : io_statistics_(io_statistics) {} + : io_statistics_(io_statistics) { + SetPeriod(std::chrono::milliseconds(5)); +} RtpAudioSender::~RtpAudioSender() { if (rtp_statistics_) { @@ -140,6 +142,5 @@ bool RtpAudioSender::Process() { rtp_statistics_->UpdateSentBytes(last_send_bytes_); } - std::this_thread::sleep_for(std::chrono::milliseconds(5)); return true; } \ No newline at end of file diff --git a/src/channel/rtp_channel/rtp_data_sender.cpp b/src/channel/rtp_channel/rtp_data_sender.cpp index 4f52ee1..fe0897a 100644 --- a/src/channel/rtp_channel/rtp_data_sender.cpp +++ b/src/channel/rtp_channel/rtp_data_sender.cpp @@ -9,7 +9,9 @@ RtpDataSender::RtpDataSender() {} RtpDataSender::RtpDataSender(std::shared_ptr io_statistics) - : io_statistics_(io_statistics) {} + : io_statistics_(io_statistics) { + SetPeriod(std::chrono::milliseconds(5)); +} RtpDataSender::~RtpDataSender() { if (rtp_statistics_) { @@ -140,6 +142,5 @@ bool RtpDataSender::Process() { rtp_statistics_->UpdateSentBytes(last_send_bytes_); } - std::this_thread::sleep_for(std::chrono::milliseconds(5)); return true; } \ No newline at end of file diff --git a/src/channel/rtp_channel/rtp_video_receiver.cpp b/src/channel/rtp_channel/rtp_video_receiver.cpp index 05c6249..2762966 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.cpp +++ b/src/channel/rtp_channel/rtp_video_receiver.cpp @@ -20,7 +20,10 @@ RtpVideoReceiver::RtpVideoReceiver() [this](int64_t bitrate_bps, std::vector ssrcs) { SendRemb(bitrate_bps, ssrcs); }), - clock_(Clock::GetRealTimeClockShared()) {} + clock_(Clock::GetRealTimeClockShared()) { + SetPeriod(std::chrono::milliseconds(5)); + // rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); +} RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr io_statistics) : io_statistics_(io_statistics), @@ -34,7 +37,8 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr io_statistics) SendRemb(bitrate_bps, ssrcs); }), clock_(Clock::GetRealTimeClockShared()) { - rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); + SetPeriod(std::chrono::milliseconds(5)); + // rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); #ifdef SAVE_RTP_RECV_STREAM file_rtp_recv_ = fopen("rtp_recv_stream.h264", "w+b"); @@ -45,19 +49,18 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr io_statistics) } RtpVideoReceiver::~RtpVideoReceiver() { - if (rtp_statistics_) { - rtp_statistics_->Stop(); - } - rtcp_stop_.store(true); - rtcp_cv_.notify_one(); - + rtcp_cv_.notify_all(); if (rtcp_thread_.joinable()) { rtcp_thread_.join(); } SSRCManager::Instance().DeleteSsrc(feedback_ssrc_); + if (rtp_statistics_) { + rtp_statistics_->Stop(); + } + #ifdef SAVE_RTP_RECV_STREAM if (file_rtp_recv_) { fflush(file_rtp_recv_); @@ -469,12 +472,11 @@ bool RtpVideoReceiver::Process() { } } - std::this_thread::sleep_for(std::chrono::milliseconds(5)); return true; } void RtpVideoReceiver::RtcpThread() { - while (!rtcp_stop_) { + while (!rtcp_stop_.load()) { std::unique_lock lock(rtcp_mtx_); if (rtcp_cv_.wait_for( lock, std::chrono::milliseconds(rtcp_tcc_interval_ms_), diff --git a/src/channel/rtp_channel/rtp_video_sender.cpp b/src/channel/rtp_channel/rtp_video_sender.cpp index 5f8fbca..a310adc 100644 --- a/src/channel/rtp_channel/rtp_video_sender.cpp +++ b/src/channel/rtp_channel/rtp_video_sender.cpp @@ -12,6 +12,7 @@ RtpVideoSender::RtpVideoSender() {} RtpVideoSender::RtpVideoSender(std::shared_ptr io_statistics) : io_statistics_(io_statistics) { + SetPeriod(std::chrono::milliseconds(5)); #ifdef SAVE_RTP_SENT_STREAM file_rtp_sent_ = fopen("rtp_sent_stream.h264", "w+b"); if (!file_rtp_sent_) { @@ -162,6 +163,5 @@ bool RtpVideoSender::Process() { rtp_statistics_->UpdateSentBytes(last_send_bytes_); } - std::this_thread::sleep_for(std::chrono::milliseconds(5)); return true; } \ No newline at end of file diff --git a/src/channel/video_channel_receive.cpp b/src/channel/video_channel_receive.cpp index bce4875..dc187a7 100644 --- a/src/channel/video_channel_receive.cpp +++ b/src/channel/video_channel_receive.cpp @@ -44,7 +44,11 @@ void VideoChannelReceive::Initialize(rtp::PAYLOAD_TYPE payload_type) { rtp_video_receiver_->Start(); } -void VideoChannelReceive::Destroy() {} +void VideoChannelReceive::Destroy() { + if (rtp_video_receiver_) { + rtp_video_receiver_->Stop(); + } +} int VideoChannelReceive::OnReceiveRtpPacket(const char *data, size_t size) { if (ice_io_statistics_) { diff --git a/src/thread/thread_base.cpp b/src/thread/thread_base.cpp index f397c2c..3160d77 100644 --- a/src/thread/thread_base.cpp +++ b/src/thread/thread_base.cpp @@ -2,23 +2,34 @@ #include "log.h" -ThreadBase::ThreadBase() {} +ThreadBase::ThreadBase() + : running_(false), pause_(false), period_(std::chrono::milliseconds(100)) {} -ThreadBase::~ThreadBase() { - if (!stop_) { - Stop(); - } -} +ThreadBase::~ThreadBase() { Stop(); } void ThreadBase::Start() { + { + std::lock_guard lock(cv_mtx_); + if (running_) { + return; // Already running + } + running_ = true; + } + std::thread t(&ThreadBase::Run, this); thread_ = std::move(t); - stop_ = false; } void ThreadBase::Stop() { + { + std::lock_guard lock(cv_mtx_); + if (!running_) { + return; // Already stopped + } + running_ = false; + } + cv_.notify_all(); if (thread_.joinable()) { - stop_ = true; thread_.join(); } } @@ -27,7 +38,17 @@ void ThreadBase::Pause() { pause_ = true; } void ThreadBase::Resume() { pause_ = false; } +void ThreadBase::SetPeriod(std::chrono::milliseconds period) { + std::lock_guard lock(cv_mtx_); + period_ = period; +} + void ThreadBase::Run() { - while (!stop_ && Process()) { + while (running_) { + std::unique_lock lock(cv_mtx_); + cv_.wait_for(lock, period_, [this] { return !running_; }); + if (running_) { + Process(); + } } } \ No newline at end of file diff --git a/src/thread/thread_base.h b/src/thread/thread_base.h index 4f66002..6877417 100644 --- a/src/thread/thread_base.h +++ b/src/thread/thread_base.h @@ -2,6 +2,9 @@ #define _THREAD_BASE_H_ #include +#include +#include +#include #include class ThreadBase { @@ -16,6 +19,8 @@ class ThreadBase { void Pause(); void Resume(); + void SetPeriod(std::chrono::milliseconds period); + virtual bool Process() = 0; private: @@ -23,9 +28,13 @@ class ThreadBase { private: std::thread thread_; + std::chrono::milliseconds period_; - std::atomic stop_{false}; - std::atomic pause_{false}; + std::condition_variable cv_; + std::mutex cv_mtx_; + + std::atomic running_; + std::atomic pause_; }; #endif \ No newline at end of file diff --git a/src/transport/ice_transport.cpp b/src/transport/ice_transport.cpp index 2787dc8..987fd36 100644 --- a/src/transport/ice_transport.cpp +++ b/src/transport/ice_transport.cpp @@ -423,20 +423,28 @@ int IceTransport::DestroyIceTransmission() { ice_io_statistics_->Stop(); } - if (rtp_video_receiver_) { - rtp_video_receiver_->Stop(); + if (video_channel_send_) { + video_channel_send_->Destroy(); } - if (rtp_video_sender_) { - rtp_video_sender_->Stop(); + if (audio_channel_send_) { + audio_channel_send_->Destroy(); } - if (rtp_audio_sender_) { - rtp_audio_sender_->Stop(); + if (data_channel_send_) { + data_channel_send_->Destroy(); } - if (rtp_data_sender_) { - rtp_data_sender_->Stop(); + if (video_channel_receive_) { + video_channel_receive_->Destroy(); + } + + if (audio_channel_receive_) { + audio_channel_receive_->Destroy(); + } + + if (data_channel_receive_) { + data_channel_receive_->Destroy(); } return ice_agent_->DestroyIceAgent();