From 160ee9feef5aaa3c543b8d86af6d9f2f65b5ec41 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Mon, 24 Mar 2025 17:39:29 +0800 Subject: [PATCH] [fix] fix crash due to thread releasing --- src/pc/peer_connection.cpp | 4 +- src/rtp/rtp_statistics/rtp_statistics.cpp | 5 +- src/thread/thread_base.cpp | 20 ++++- src/thread/thread_base.h | 4 + src/transport/channel/rtp_audio_sender.cpp | 1 + src/transport/channel/rtp_data_sender.cpp | 1 + src/transport/channel/rtp_video_receiver.cpp | 27 ++++-- src/transport/channel/rtp_video_receiver.h | 4 + src/transport/channel/rtp_video_sender.cpp | 1 + .../channel/video_channel_receive.cpp | 1 + src/transport/channel/video_channel_send.cpp | 6 +- src/transport/ice_transport.cpp | 4 + src/transport/ice_transport_controller.cpp | 86 +++++++++++++------ src/transport/ice_transport_controller.h | 1 + .../packet_sender/packet_sender_imp.cpp | 2 +- 15 files changed, 126 insertions(+), 41 deletions(-) diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index 5972bb5..2cc1477 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -629,8 +629,8 @@ void PeerConnection::ProcessIceWorkMsg(const IceWorkMsg &msg) { } case IceWorkMsg::Type::UserLeaveTransmission: { std::string user_id = msg.user_id; - LOG_INFO("Receive notification: user id [{}] leave transmission", - user_id); + LOG_INFO("[{}] Receive notification: user id [{}] leave transmission", + (void *)this, user_id); auto user_id_it = ice_transport_list_.find(user_id); if (user_id_it != ice_transport_list_.end()) { user_id_it->second->DestroyIceTransmission(); diff --git a/src/rtp/rtp_statistics/rtp_statistics.cpp b/src/rtp/rtp_statistics/rtp_statistics.cpp index 36c0880..6143dc3 100644 --- a/src/rtp/rtp_statistics/rtp_statistics.cpp +++ b/src/rtp/rtp_statistics/rtp_statistics.cpp @@ -2,7 +2,10 @@ #include "log.h" -RtpStatistics::RtpStatistics() {} +RtpStatistics::RtpStatistics() { + SetPeriod(std::chrono::milliseconds(1000)); + SetThreadName("RtpStatistics"); +} RtpStatistics::~RtpStatistics() {} diff --git a/src/thread/thread_base.cpp b/src/thread/thread_base.cpp index 3160d77..a4220c3 100644 --- a/src/thread/thread_base.cpp +++ b/src/thread/thread_base.cpp @@ -3,7 +3,10 @@ #include "log.h" ThreadBase::ThreadBase() - : running_(false), pause_(false), period_(std::chrono::milliseconds(100)) {} + : running_(false), + pause_(false), + period_(std::chrono::milliseconds(100)), + thread_name_("UnnamedThread") {} ThreadBase::~ThreadBase() { Stop(); } @@ -11,7 +14,7 @@ void ThreadBase::Start() { { std::lock_guard lock(cv_mtx_); if (running_) { - return; // Already running + return; } running_ = true; } @@ -24,10 +27,11 @@ void ThreadBase::Stop() { { std::lock_guard lock(cv_mtx_); if (!running_) { - return; // Already stopped + return; } running_ = false; } + cv_.notify_all(); if (thread_.joinable()) { thread_.join(); @@ -43,6 +47,16 @@ void ThreadBase::SetPeriod(std::chrono::milliseconds period) { period_ = period; } +void ThreadBase::SetThreadName(const std::string& name) { + std::lock_guard lock(cv_mtx_); + thread_name_ = name; +} + +std::string ThreadBase::GetThreadName() { + std::lock_guard lock(cv_mtx_); + return thread_name_; +} + void ThreadBase::Run() { while (running_) { std::unique_lock lock(cv_mtx_); diff --git a/src/thread/thread_base.h b/src/thread/thread_base.h index 6877417..4bff0d8 100644 --- a/src/thread/thread_base.h +++ b/src/thread/thread_base.h @@ -5,6 +5,7 @@ #include #include #include +#include #include class ThreadBase { @@ -20,6 +21,8 @@ class ThreadBase { void Resume(); void SetPeriod(std::chrono::milliseconds period); + void SetThreadName(const std::string& name); + std::string GetThreadName(); virtual bool Process() = 0; @@ -29,6 +32,7 @@ class ThreadBase { private: std::thread thread_; std::chrono::milliseconds period_; + std::string thread_name_; std::condition_variable cv_; std::mutex cv_mtx_; diff --git a/src/transport/channel/rtp_audio_sender.cpp b/src/transport/channel/rtp_audio_sender.cpp index a0f5e90..7d58ae7 100644 --- a/src/transport/channel/rtp_audio_sender.cpp +++ b/src/transport/channel/rtp_audio_sender.cpp @@ -12,6 +12,7 @@ RtpAudioSender::RtpAudioSender() { SetPeriod(std::chrono::milliseconds(5)); } RtpAudioSender::RtpAudioSender(std::shared_ptr io_statistics) : ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics) { SetPeriod(std::chrono::milliseconds(5)); + SetThreadName("RtpAudioSender"); } RtpAudioSender::~RtpAudioSender() { diff --git a/src/transport/channel/rtp_data_sender.cpp b/src/transport/channel/rtp_data_sender.cpp index 1739e42..6b869a0 100644 --- a/src/transport/channel/rtp_data_sender.cpp +++ b/src/transport/channel/rtp_data_sender.cpp @@ -12,6 +12,7 @@ RtpDataSender::RtpDataSender() {} RtpDataSender::RtpDataSender(std::shared_ptr io_statistics) : ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics) { SetPeriod(std::chrono::milliseconds(5)); + SetThreadName("RtpDataSender"); } RtpDataSender::~RtpDataSender() { diff --git a/src/transport/channel/rtp_video_receiver.cpp b/src/transport/channel/rtp_video_receiver.cpp index 3083272..17009dd 100644 --- a/src/transport/channel/rtp_video_receiver.cpp +++ b/src/transport/channel/rtp_video_receiver.cpp @@ -17,6 +17,7 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock) : ssrc_(GenerateUniqueSsrc()), active_remb_module_(nullptr), + is_running_(true), receive_side_congestion_controller_( clock_, [this](std::vector> packets) { @@ -35,6 +36,7 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock) clock->CurrentTimeMs()), clock_(webrtc::Clock::GetWebrtcClockShared(clock)) { SetPeriod(std::chrono::milliseconds(5)); + SetThreadName("RtpVideoReceiver"); rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); } @@ -42,6 +44,7 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock, std::shared_ptr io_statistics) : io_statistics_(io_statistics), ssrc_(GenerateUniqueSsrc()), + is_running_(true), receive_side_congestion_controller_( clock_, [this](std::vector> packets) { @@ -58,6 +61,7 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock, nack_(std::make_unique(clock_, this, this)), clock_(webrtc::Clock::GetWebrtcClockShared(clock)) { SetPeriod(std::chrono::milliseconds(5)); + SetThreadName("RtpVideoReceiver"); rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); #ifdef SAVE_RTP_RECV_STREAM @@ -69,11 +73,7 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock, } RtpVideoReceiver::~RtpVideoReceiver() { - rtcp_stop_.store(true); - rtcp_cv_.notify_all(); - if (rtcp_thread_.joinable()) { - rtcp_thread_.join(); - } + StopRtcp(); SSRCManager::Instance().DeleteSsrc(ssrc_); @@ -670,6 +670,10 @@ void RtpVideoReceiver::CheckIsTimeUpdateNack(uint32_t now) { } bool RtpVideoReceiver::Process() { + if (!is_running_.load()) { + return false; + } + if (!compelete_video_frame_queue_.isEmpty()) { std::optional video_frame = compelete_video_frame_queue_.pop(); @@ -773,6 +777,19 @@ void RtpVideoReceiver::SendRR() { last_report_cumulative_loss_ = cumulative_loss_; } +void RtpVideoReceiver::StopRtcp() { + is_running_.store(false); + if (rtcp_stop_.load()) { + return; + } + + rtcp_stop_.store(true); + rtcp_cv_.notify_all(); + if (rtcp_thread_.joinable()) { + rtcp_thread_.join(); + } +} + void RtpVideoReceiver::RtcpThread() { while (!rtcp_stop_.load()) { std::unique_lock lock(rtcp_mtx_); diff --git a/src/transport/channel/rtp_video_receiver.h b/src/transport/channel/rtp_video_receiver.h index 47350f9..deb38f2 100644 --- a/src/transport/channel/rtp_video_receiver.h +++ b/src/transport/channel/rtp_video_receiver.h @@ -48,6 +48,9 @@ class RtpVideoReceiver : public ThreadBase, } uint32_t GetSsrc() { return ssrc_; } uint32_t GetRemoteSsrc() { return remote_ssrc_; } + + void StopRtcp(); + void OnSenderReport(const SenderReport& sender_report); private: @@ -134,6 +137,7 @@ class RtpVideoReceiver : public ThreadBase, std::atomic rtcp_stop_ = false; int rtcp_rr_interval_ms_ = 5000; int rtcp_tcc_interval_ms_ = 200; + std::atomic is_running_; private: uint32_t ssrc_ = 0; diff --git a/src/transport/channel/rtp_video_sender.cpp b/src/transport/channel/rtp_video_sender.cpp index a766846..a7b7876 100644 --- a/src/transport/channel/rtp_video_sender.cpp +++ b/src/transport/channel/rtp_video_sender.cpp @@ -18,6 +18,7 @@ RtpVideoSender::RtpVideoSender(std::shared_ptr clock, io_statistics_(io_statistics), clock_(webrtc::Clock::GetWebrtcClockShared(clock)) { SetPeriod(std::chrono::milliseconds(5)); + SetThreadName("RtpVideoSender"); #ifdef SAVE_RTP_SENT_STREAM file_rtp_sent_ = fopen("rtp_sent_stream.h264", "w+b"); if (!file_rtp_sent_) { diff --git a/src/transport/channel/video_channel_receive.cpp b/src/transport/channel/video_channel_receive.cpp index 63f3e1d..415b0f5 100644 --- a/src/transport/channel/video_channel_receive.cpp +++ b/src/transport/channel/video_channel_receive.cpp @@ -46,6 +46,7 @@ void VideoChannelReceive::Initialize(rtp::PAYLOAD_TYPE payload_type) { void VideoChannelReceive::Destroy() { if (rtp_video_receiver_) { + rtp_video_receiver_->StopRtcp(); rtp_video_receiver_->Stop(); } } diff --git a/src/transport/channel/video_channel_send.cpp b/src/transport/channel/video_channel_send.cpp index f225fc3..5cba365 100644 --- a/src/transport/channel/video_channel_send.cpp +++ b/src/transport/channel/video_channel_send.cpp @@ -106,7 +106,7 @@ void VideoChannelSend::Destroy() { } int VideoChannelSend::SendVideo(std::shared_ptr encoded_frame) { - if (rtp_video_sender_ && rtp_packetizer_) { + if (rtp_video_sender_ && rtp_packetizer_ && packet_sender_) { int32_t rtp_timestamp = delta_ntp_internal_ms_ + static_cast(encoded_frame->CapturedTimestamp() / 1000); @@ -158,7 +158,9 @@ int32_t VideoChannelSend::ReSendPacket(uint16_t packet_id) { std::vector> packets; packets.emplace_back(std::move(packet)); - packet_sender_->EnqueueRtpPacket(std::move(packets)); + if (packet_sender_) { + packet_sender_->EnqueueRtpPacket(std::move(packets)); + } return packet_size; } \ No newline at end of file diff --git a/src/transport/ice_transport.cpp b/src/transport/ice_transport.cpp index b40102d..a795396 100644 --- a/src/transport/ice_transport.cpp +++ b/src/transport/ice_transport.cpp @@ -434,6 +434,10 @@ int IceTransport::DestroyIceTransmission() { ice_io_statistics_->Stop(); } + if (ice_transport_controller_) { + ice_transport_controller_->Destroy(); + } + return ice_agent_->DestroyIceAgent(); } diff --git a/src/transport/ice_transport_controller.cpp b/src/transport/ice_transport_controller.cpp index 6ef4134..c1c58d2 100644 --- a/src/transport/ice_transport_controller.cpp +++ b/src/transport/ice_transport_controller.cpp @@ -19,8 +19,10 @@ IceTransportController::IceTransportController( audio_codec_inited_(false), load_nvcodec_dll_success_(false), hardware_acceleration_(false), + is_running_(true), congestion_window_size_(DataSize::PlusInfinity()) { SetPeriod(std::chrono::milliseconds(25)); + SetThreadName("IceTransportController"); } IceTransportController::~IceTransportController() { @@ -92,25 +94,38 @@ void IceTransportController::Create( OnSentRtpPacket(packet); }); - packet_sender_->SetGeneratePaddingFunc( - [this](uint32_t size, int64_t captured_timestamp_us) - -> std::vector> { - return video_channel_send_->GeneratePadding(size, - captured_timestamp_us); - }); + if (packet_sender_) { + packet_sender_->SetGeneratePaddingFunc( + [this](uint32_t size, int64_t captured_timestamp_us) + -> std::vector> { + return video_channel_send_->GeneratePadding(size, + captured_timestamp_us); + }); + } audio_channel_send_ = std::make_unique( ice_agent, packet_sender_, ice_io_statistics); data_channel_send_ = std::make_unique( ice_agent, packet_sender_, ice_io_statistics); - video_channel_send_->Initialize(video_codec_payload_type); - audio_channel_send_->Initialize(rtp::PAYLOAD_TYPE::OPUS); - data_channel_send_->Initialize(rtp::PAYLOAD_TYPE::DATA); + if (video_channel_send_) { + video_channel_send_->Initialize(video_codec_payload_type); + video_channel_send_->SetEnqueuePacketsFunc( + [this](std::vector>& packets) + -> void { + if (packet_sender_) { + packet_sender_->EnqueuePackets(std::move(packets)); + } + }); + } - video_channel_send_->SetEnqueuePacketsFunc( - [this](std::vector>& packets) - -> void { packet_sender_->EnqueuePackets(std::move(packets)); }); + if (audio_channel_send_) { + audio_channel_send_->Initialize(rtp::PAYLOAD_TYPE::OPUS); + } + + if (data_channel_send_) { + data_channel_send_->Initialize(rtp::PAYLOAD_TYPE::DATA); + } std::weak_ptr weak_self = shared_from_this(); video_channel_receive_ = std::make_unique( @@ -143,6 +158,8 @@ void IceTransportController::Create( } void IceTransportController::Destroy() { + is_running_.store(false); + if (video_channel_send_) { video_channel_send_->Destroy(); } @@ -166,6 +183,8 @@ void IceTransportController::Destroy() { if (data_channel_receive_) { data_channel_receive_->Destroy(); } + + Stop(); } int IceTransportController::SendVideo(const XVideoFrame* video_frame) { @@ -256,6 +275,9 @@ void IceTransportController::UpdateNetworkAvaliablity(bool network_available) { webrtc::Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); msg.network_available = network_available; controller_->OnNetworkAvailability(msg); + } + + if (packet_sender_) { packet_sender_->EnsureStarted(); } } @@ -484,11 +506,13 @@ void IceTransportController::OnReceiverReport( msg.start_time = last_report_block_time_; msg.end_time = now; - task_queue_->PostTask([this, msg]() mutable { - if (controller_) { - PostUpdates(controller_->OnTransportLossReport(msg)); - } - }); + if (task_queue_) { + task_queue_->PostTask([this, msg]() mutable { + if (controller_) { + PostUpdates(controller_->OnTransportLossReport(msg)); + } + }); + } last_report_block_time_ = now; } @@ -498,7 +522,7 @@ void IceTransportController::OnCongestionControlFeedback( std::optional feedback_msg = transport_feedback_adapter_.ProcessCongestionControlFeedback( feedback, Timestamp::Micros(clock_->CurrentTimeUs())); - if (feedback_msg.has_value()) { + if (feedback_msg.has_value() && task_queue_) { task_queue_->PostTask([this, feedback_msg]() mutable { if (controller_) { PostUpdates( @@ -543,12 +567,12 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) { UpdateCongestedState(); } - if (update.pacer_config) { + if (update.pacer_config && packet_sender_) { packet_sender_->SetPacingRates(update.pacer_config->data_rate(), update.pacer_config->pad_rate()); } - if (!update.probe_cluster_configs.empty()) { + if (!update.probe_cluster_configs.empty() && packet_sender_) { packet_sender_->CreateProbeClusters( std::move(update.probe_cluster_configs)); } @@ -559,7 +583,7 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) { ? target_bitrate_ : update.target_rate->target_rate.bps()) : target_bitrate_; - if (target_bitrate != target_bitrate_) { + if (target_bitrate != target_bitrate_ && video_encoder_) { target_bitrate_ = target_bitrate; int width, height, target_width, target_height; video_encoder_->GetResolution(&width, &height); @@ -591,7 +615,9 @@ void IceTransportController::UpdateControlState() { void IceTransportController::UpdateCongestedState() { if (auto update = GetCongestedStateUpdate()) { is_congested_ = update.value(); - packet_sender_->SetCongested(update.value()); + if (packet_sender_) { + packet_sender_->SetCongested(update.value()); + } } } @@ -603,11 +629,17 @@ std::optional IceTransportController::GetCongestedStateUpdate() const { } bool IceTransportController::Process() { - task_queue_->PostTask([this]() mutable { - webrtc::ProcessInterval msg; - msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); - PostUpdates(controller_->OnProcessInterval(msg)); - }); + if (!is_running_.load()) { + return false; + } + + if (task_queue_ && controller_) { + task_queue_->PostTask([this]() mutable { + webrtc::ProcessInterval msg; + msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); + PostUpdates(controller_->OnProcessInterval(msg)); + }); + } return true; } \ No newline at end of file diff --git a/src/transport/ice_transport_controller.h b/src/transport/ice_transport_controller.h index 3965f17..89f8bc1 100644 --- a/src/transport/ice_transport_controller.h +++ b/src/transport/ice_transport_controller.h @@ -112,6 +112,7 @@ class IceTransportController std::shared_ptr packet_sender_ = nullptr; std::string remote_user_id_; void *user_data_ = nullptr; + std::atomic is_running_; private: std::shared_ptr clock_; diff --git a/src/transport/packet_sender/packet_sender_imp.cpp b/src/transport/packet_sender/packet_sender_imp.cpp index 72045fe..86df7a3 100644 --- a/src/transport/packet_sender/packet_sender_imp.cpp +++ b/src/transport/packet_sender/packet_sender_imp.cpp @@ -22,7 +22,7 @@ PacketSenderImp::PacketSenderImp(std::shared_ptr ice_agent, last_call_time_(webrtc::Timestamp::Millis(0)), task_queue_(task_queue) {} -PacketSenderImp::~PacketSenderImp() {} +PacketSenderImp::~PacketSenderImp() { is_shutdown_ = true; } std::vector> PacketSenderImp::GeneratePadding(webrtc::DataSize size) {