[fix] fix crash due to thread releasing

This commit is contained in:
dijunkun
2025-03-24 17:39:29 +08:00
parent d17b29dfa4
commit 160ee9feef
15 changed files with 126 additions and 41 deletions

View File

@@ -629,8 +629,8 @@ void PeerConnection::ProcessIceWorkMsg(const IceWorkMsg &msg) {
} }
case IceWorkMsg::Type::UserLeaveTransmission: { case IceWorkMsg::Type::UserLeaveTransmission: {
std::string user_id = msg.user_id; std::string user_id = msg.user_id;
LOG_INFO("Receive notification: user id [{}] leave transmission", LOG_INFO("[{}] Receive notification: user id [{}] leave transmission",
user_id); (void *)this, user_id);
auto user_id_it = ice_transport_list_.find(user_id); auto user_id_it = ice_transport_list_.find(user_id);
if (user_id_it != ice_transport_list_.end()) { if (user_id_it != ice_transport_list_.end()) {
user_id_it->second->DestroyIceTransmission(); user_id_it->second->DestroyIceTransmission();

View File

@@ -2,7 +2,10 @@
#include "log.h" #include "log.h"
RtpStatistics::RtpStatistics() {} RtpStatistics::RtpStatistics() {
SetPeriod(std::chrono::milliseconds(1000));
SetThreadName("RtpStatistics");
}
RtpStatistics::~RtpStatistics() {} RtpStatistics::~RtpStatistics() {}

View File

@@ -3,7 +3,10 @@
#include "log.h" #include "log.h"
ThreadBase::ThreadBase() ThreadBase::ThreadBase()
: running_(false), pause_(false), period_(std::chrono::milliseconds(100)) {} : running_(false),
pause_(false),
period_(std::chrono::milliseconds(100)),
thread_name_("UnnamedThread") {}
ThreadBase::~ThreadBase() { Stop(); } ThreadBase::~ThreadBase() { Stop(); }
@@ -11,7 +14,7 @@ void ThreadBase::Start() {
{ {
std::lock_guard<std::mutex> lock(cv_mtx_); std::lock_guard<std::mutex> lock(cv_mtx_);
if (running_) { if (running_) {
return; // Already running return;
} }
running_ = true; running_ = true;
} }
@@ -24,10 +27,11 @@ void ThreadBase::Stop() {
{ {
std::lock_guard<std::mutex> lock(cv_mtx_); std::lock_guard<std::mutex> lock(cv_mtx_);
if (!running_) { if (!running_) {
return; // Already stopped return;
} }
running_ = false; running_ = false;
} }
cv_.notify_all(); cv_.notify_all();
if (thread_.joinable()) { if (thread_.joinable()) {
thread_.join(); thread_.join();
@@ -43,6 +47,16 @@ void ThreadBase::SetPeriod(std::chrono::milliseconds period) {
period_ = period; period_ = period;
} }
void ThreadBase::SetThreadName(const std::string& name) {
std::lock_guard<std::mutex> lock(cv_mtx_);
thread_name_ = name;
}
std::string ThreadBase::GetThreadName() {
std::lock_guard<std::mutex> lock(cv_mtx_);
return thread_name_;
}
void ThreadBase::Run() { void ThreadBase::Run() {
while (running_) { while (running_) {
std::unique_lock<std::mutex> lock(cv_mtx_); std::unique_lock<std::mutex> lock(cv_mtx_);

View File

@@ -5,6 +5,7 @@
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <mutex> #include <mutex>
#include <string>
#include <thread> #include <thread>
class ThreadBase { class ThreadBase {
@@ -20,6 +21,8 @@ class ThreadBase {
void Resume(); void Resume();
void SetPeriod(std::chrono::milliseconds period); void SetPeriod(std::chrono::milliseconds period);
void SetThreadName(const std::string& name);
std::string GetThreadName();
virtual bool Process() = 0; virtual bool Process() = 0;
@@ -29,6 +32,7 @@ class ThreadBase {
private: private:
std::thread thread_; std::thread thread_;
std::chrono::milliseconds period_; std::chrono::milliseconds period_;
std::string thread_name_;
std::condition_variable cv_; std::condition_variable cv_;
std::mutex cv_mtx_; std::mutex cv_mtx_;

View File

@@ -12,6 +12,7 @@ RtpAudioSender::RtpAudioSender() { SetPeriod(std::chrono::milliseconds(5)); }
RtpAudioSender::RtpAudioSender(std::shared_ptr<IOStatistics> io_statistics) RtpAudioSender::RtpAudioSender(std::shared_ptr<IOStatistics> io_statistics)
: ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics) { : ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics) {
SetPeriod(std::chrono::milliseconds(5)); SetPeriod(std::chrono::milliseconds(5));
SetThreadName("RtpAudioSender");
} }
RtpAudioSender::~RtpAudioSender() { RtpAudioSender::~RtpAudioSender() {

View File

@@ -12,6 +12,7 @@ RtpDataSender::RtpDataSender() {}
RtpDataSender::RtpDataSender(std::shared_ptr<IOStatistics> io_statistics) RtpDataSender::RtpDataSender(std::shared_ptr<IOStatistics> io_statistics)
: ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics) { : ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics) {
SetPeriod(std::chrono::milliseconds(5)); SetPeriod(std::chrono::milliseconds(5));
SetThreadName("RtpDataSender");
} }
RtpDataSender::~RtpDataSender() { RtpDataSender::~RtpDataSender() {

View File

@@ -17,6 +17,7 @@
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<SystemClock> clock) RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<SystemClock> clock)
: ssrc_(GenerateUniqueSsrc()), : ssrc_(GenerateUniqueSsrc()),
active_remb_module_(nullptr), active_remb_module_(nullptr),
is_running_(true),
receive_side_congestion_controller_( receive_side_congestion_controller_(
clock_, clock_,
[this](std::vector<std::unique_ptr<RtcpPacket>> packets) { [this](std::vector<std::unique_ptr<RtcpPacket>> packets) {
@@ -35,6 +36,7 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<SystemClock> clock)
clock->CurrentTimeMs()), clock->CurrentTimeMs()),
clock_(webrtc::Clock::GetWebrtcClockShared(clock)) { clock_(webrtc::Clock::GetWebrtcClockShared(clock)) {
SetPeriod(std::chrono::milliseconds(5)); SetPeriod(std::chrono::milliseconds(5));
SetThreadName("RtpVideoReceiver");
rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this);
} }
@@ -42,6 +44,7 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<SystemClock> clock,
std::shared_ptr<IOStatistics> io_statistics) std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics), : io_statistics_(io_statistics),
ssrc_(GenerateUniqueSsrc()), ssrc_(GenerateUniqueSsrc()),
is_running_(true),
receive_side_congestion_controller_( receive_side_congestion_controller_(
clock_, clock_,
[this](std::vector<std::unique_ptr<RtcpPacket>> packets) { [this](std::vector<std::unique_ptr<RtcpPacket>> packets) {
@@ -58,6 +61,7 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<SystemClock> clock,
nack_(std::make_unique<NackRequester>(clock_, this, this)), nack_(std::make_unique<NackRequester>(clock_, this, this)),
clock_(webrtc::Clock::GetWebrtcClockShared(clock)) { clock_(webrtc::Clock::GetWebrtcClockShared(clock)) {
SetPeriod(std::chrono::milliseconds(5)); SetPeriod(std::chrono::milliseconds(5));
SetThreadName("RtpVideoReceiver");
rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this); rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this);
#ifdef SAVE_RTP_RECV_STREAM #ifdef SAVE_RTP_RECV_STREAM
@@ -69,11 +73,7 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<SystemClock> clock,
} }
RtpVideoReceiver::~RtpVideoReceiver() { RtpVideoReceiver::~RtpVideoReceiver() {
rtcp_stop_.store(true); StopRtcp();
rtcp_cv_.notify_all();
if (rtcp_thread_.joinable()) {
rtcp_thread_.join();
}
SSRCManager::Instance().DeleteSsrc(ssrc_); SSRCManager::Instance().DeleteSsrc(ssrc_);
@@ -670,6 +670,10 @@ void RtpVideoReceiver::CheckIsTimeUpdateNack(uint32_t now) {
} }
bool RtpVideoReceiver::Process() { bool RtpVideoReceiver::Process() {
if (!is_running_.load()) {
return false;
}
if (!compelete_video_frame_queue_.isEmpty()) { if (!compelete_video_frame_queue_.isEmpty()) {
std::optional<ReceivedFrame> video_frame = std::optional<ReceivedFrame> video_frame =
compelete_video_frame_queue_.pop(); compelete_video_frame_queue_.pop();
@@ -773,6 +777,19 @@ void RtpVideoReceiver::SendRR() {
last_report_cumulative_loss_ = cumulative_loss_; last_report_cumulative_loss_ = cumulative_loss_;
} }
void RtpVideoReceiver::StopRtcp() {
is_running_.store(false);
if (rtcp_stop_.load()) {
return;
}
rtcp_stop_.store(true);
rtcp_cv_.notify_all();
if (rtcp_thread_.joinable()) {
rtcp_thread_.join();
}
}
void RtpVideoReceiver::RtcpThread() { void RtpVideoReceiver::RtcpThread() {
while (!rtcp_stop_.load()) { while (!rtcp_stop_.load()) {
std::unique_lock<std::mutex> lock(rtcp_mtx_); std::unique_lock<std::mutex> lock(rtcp_mtx_);

View File

@@ -48,6 +48,9 @@ class RtpVideoReceiver : public ThreadBase,
} }
uint32_t GetSsrc() { return ssrc_; } uint32_t GetSsrc() { return ssrc_; }
uint32_t GetRemoteSsrc() { return remote_ssrc_; } uint32_t GetRemoteSsrc() { return remote_ssrc_; }
void StopRtcp();
void OnSenderReport(const SenderReport& sender_report); void OnSenderReport(const SenderReport& sender_report);
private: private:
@@ -134,6 +137,7 @@ class RtpVideoReceiver : public ThreadBase,
std::atomic<bool> rtcp_stop_ = false; std::atomic<bool> rtcp_stop_ = false;
int rtcp_rr_interval_ms_ = 5000; int rtcp_rr_interval_ms_ = 5000;
int rtcp_tcc_interval_ms_ = 200; int rtcp_tcc_interval_ms_ = 200;
std::atomic<bool> is_running_;
private: private:
uint32_t ssrc_ = 0; uint32_t ssrc_ = 0;

View File

@@ -18,6 +18,7 @@ RtpVideoSender::RtpVideoSender(std::shared_ptr<SystemClock> clock,
io_statistics_(io_statistics), io_statistics_(io_statistics),
clock_(webrtc::Clock::GetWebrtcClockShared(clock)) { clock_(webrtc::Clock::GetWebrtcClockShared(clock)) {
SetPeriod(std::chrono::milliseconds(5)); SetPeriod(std::chrono::milliseconds(5));
SetThreadName("RtpVideoSender");
#ifdef SAVE_RTP_SENT_STREAM #ifdef SAVE_RTP_SENT_STREAM
file_rtp_sent_ = fopen("rtp_sent_stream.h264", "w+b"); file_rtp_sent_ = fopen("rtp_sent_stream.h264", "w+b");
if (!file_rtp_sent_) { if (!file_rtp_sent_) {

View File

@@ -46,6 +46,7 @@ void VideoChannelReceive::Initialize(rtp::PAYLOAD_TYPE payload_type) {
void VideoChannelReceive::Destroy() { void VideoChannelReceive::Destroy() {
if (rtp_video_receiver_) { if (rtp_video_receiver_) {
rtp_video_receiver_->StopRtcp();
rtp_video_receiver_->Stop(); rtp_video_receiver_->Stop();
} }
} }

View File

@@ -106,7 +106,7 @@ void VideoChannelSend::Destroy() {
} }
int VideoChannelSend::SendVideo(std::shared_ptr<EncodedFrame> encoded_frame) { int VideoChannelSend::SendVideo(std::shared_ptr<EncodedFrame> encoded_frame) {
if (rtp_video_sender_ && rtp_packetizer_) { if (rtp_video_sender_ && rtp_packetizer_ && packet_sender_) {
int32_t rtp_timestamp = int32_t rtp_timestamp =
delta_ntp_internal_ms_ + delta_ntp_internal_ms_ +
static_cast<uint32_t>(encoded_frame->CapturedTimestamp() / 1000); static_cast<uint32_t>(encoded_frame->CapturedTimestamp() / 1000);
@@ -158,7 +158,9 @@ int32_t VideoChannelSend::ReSendPacket(uint16_t packet_id) {
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets; std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets;
packets.emplace_back(std::move(packet)); packets.emplace_back(std::move(packet));
if (packet_sender_) {
packet_sender_->EnqueueRtpPacket(std::move(packets)); packet_sender_->EnqueueRtpPacket(std::move(packets));
}
return packet_size; return packet_size;
} }

View File

@@ -434,6 +434,10 @@ int IceTransport::DestroyIceTransmission() {
ice_io_statistics_->Stop(); ice_io_statistics_->Stop();
} }
if (ice_transport_controller_) {
ice_transport_controller_->Destroy();
}
return ice_agent_->DestroyIceAgent(); return ice_agent_->DestroyIceAgent();
} }

View File

@@ -19,8 +19,10 @@ IceTransportController::IceTransportController(
audio_codec_inited_(false), audio_codec_inited_(false),
load_nvcodec_dll_success_(false), load_nvcodec_dll_success_(false),
hardware_acceleration_(false), hardware_acceleration_(false),
is_running_(true),
congestion_window_size_(DataSize::PlusInfinity()) { congestion_window_size_(DataSize::PlusInfinity()) {
SetPeriod(std::chrono::milliseconds(25)); SetPeriod(std::chrono::milliseconds(25));
SetThreadName("IceTransportController");
} }
IceTransportController::~IceTransportController() { IceTransportController::~IceTransportController() {
@@ -92,25 +94,38 @@ void IceTransportController::Create(
OnSentRtpPacket(packet); OnSentRtpPacket(packet);
}); });
if (packet_sender_) {
packet_sender_->SetGeneratePaddingFunc( packet_sender_->SetGeneratePaddingFunc(
[this](uint32_t size, int64_t captured_timestamp_us) [this](uint32_t size, int64_t captured_timestamp_us)
-> std::vector<std::unique_ptr<RtpPacket>> { -> std::vector<std::unique_ptr<RtpPacket>> {
return video_channel_send_->GeneratePadding(size, return video_channel_send_->GeneratePadding(size,
captured_timestamp_us); captured_timestamp_us);
}); });
}
audio_channel_send_ = std::make_unique<AudioChannelSend>( audio_channel_send_ = std::make_unique<AudioChannelSend>(
ice_agent, packet_sender_, ice_io_statistics); ice_agent, packet_sender_, ice_io_statistics);
data_channel_send_ = std::make_unique<DataChannelSend>( data_channel_send_ = std::make_unique<DataChannelSend>(
ice_agent, packet_sender_, ice_io_statistics); ice_agent, packet_sender_, ice_io_statistics);
if (video_channel_send_) {
video_channel_send_->Initialize(video_codec_payload_type); video_channel_send_->Initialize(video_codec_payload_type);
audio_channel_send_->Initialize(rtp::PAYLOAD_TYPE::OPUS);
data_channel_send_->Initialize(rtp::PAYLOAD_TYPE::DATA);
video_channel_send_->SetEnqueuePacketsFunc( video_channel_send_->SetEnqueuePacketsFunc(
[this](std::vector<std::unique_ptr<webrtc::RtpPacketToSend>>& packets) [this](std::vector<std::unique_ptr<webrtc::RtpPacketToSend>>& packets)
-> void { packet_sender_->EnqueuePackets(std::move(packets)); }); -> void {
if (packet_sender_) {
packet_sender_->EnqueuePackets(std::move(packets));
}
});
}
if (audio_channel_send_) {
audio_channel_send_->Initialize(rtp::PAYLOAD_TYPE::OPUS);
}
if (data_channel_send_) {
data_channel_send_->Initialize(rtp::PAYLOAD_TYPE::DATA);
}
std::weak_ptr<IceTransportController> weak_self = shared_from_this(); std::weak_ptr<IceTransportController> weak_self = shared_from_this();
video_channel_receive_ = std::make_unique<VideoChannelReceive>( video_channel_receive_ = std::make_unique<VideoChannelReceive>(
@@ -143,6 +158,8 @@ void IceTransportController::Create(
} }
void IceTransportController::Destroy() { void IceTransportController::Destroy() {
is_running_.store(false);
if (video_channel_send_) { if (video_channel_send_) {
video_channel_send_->Destroy(); video_channel_send_->Destroy();
} }
@@ -166,6 +183,8 @@ void IceTransportController::Destroy() {
if (data_channel_receive_) { if (data_channel_receive_) {
data_channel_receive_->Destroy(); data_channel_receive_->Destroy();
} }
Stop();
} }
int IceTransportController::SendVideo(const XVideoFrame* video_frame) { int IceTransportController::SendVideo(const XVideoFrame* video_frame) {
@@ -256,6 +275,9 @@ void IceTransportController::UpdateNetworkAvaliablity(bool network_available) {
webrtc::Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); webrtc::Timestamp::Millis(webrtc_clock_->TimeInMilliseconds());
msg.network_available = network_available; msg.network_available = network_available;
controller_->OnNetworkAvailability(msg); controller_->OnNetworkAvailability(msg);
}
if (packet_sender_) {
packet_sender_->EnsureStarted(); packet_sender_->EnsureStarted();
} }
} }
@@ -484,11 +506,13 @@ void IceTransportController::OnReceiverReport(
msg.start_time = last_report_block_time_; msg.start_time = last_report_block_time_;
msg.end_time = now; msg.end_time = now;
if (task_queue_) {
task_queue_->PostTask([this, msg]() mutable { task_queue_->PostTask([this, msg]() mutable {
if (controller_) { if (controller_) {
PostUpdates(controller_->OnTransportLossReport(msg)); PostUpdates(controller_->OnTransportLossReport(msg));
} }
}); });
}
last_report_block_time_ = now; last_report_block_time_ = now;
} }
@@ -498,7 +522,7 @@ void IceTransportController::OnCongestionControlFeedback(
std::optional<webrtc::TransportPacketsFeedback> feedback_msg = std::optional<webrtc::TransportPacketsFeedback> feedback_msg =
transport_feedback_adapter_.ProcessCongestionControlFeedback( transport_feedback_adapter_.ProcessCongestionControlFeedback(
feedback, Timestamp::Micros(clock_->CurrentTimeUs())); feedback, Timestamp::Micros(clock_->CurrentTimeUs()));
if (feedback_msg.has_value()) { if (feedback_msg.has_value() && task_queue_) {
task_queue_->PostTask([this, feedback_msg]() mutable { task_queue_->PostTask([this, feedback_msg]() mutable {
if (controller_) { if (controller_) {
PostUpdates( PostUpdates(
@@ -543,12 +567,12 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) {
UpdateCongestedState(); UpdateCongestedState();
} }
if (update.pacer_config) { if (update.pacer_config && packet_sender_) {
packet_sender_->SetPacingRates(update.pacer_config->data_rate(), packet_sender_->SetPacingRates(update.pacer_config->data_rate(),
update.pacer_config->pad_rate()); update.pacer_config->pad_rate());
} }
if (!update.probe_cluster_configs.empty()) { if (!update.probe_cluster_configs.empty() && packet_sender_) {
packet_sender_->CreateProbeClusters( packet_sender_->CreateProbeClusters(
std::move(update.probe_cluster_configs)); std::move(update.probe_cluster_configs));
} }
@@ -559,7 +583,7 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) {
? target_bitrate_ ? target_bitrate_
: update.target_rate->target_rate.bps()) : update.target_rate->target_rate.bps())
: target_bitrate_; : target_bitrate_;
if (target_bitrate != target_bitrate_) { if (target_bitrate != target_bitrate_ && video_encoder_) {
target_bitrate_ = target_bitrate; target_bitrate_ = target_bitrate;
int width, height, target_width, target_height; int width, height, target_width, target_height;
video_encoder_->GetResolution(&width, &height); video_encoder_->GetResolution(&width, &height);
@@ -591,9 +615,11 @@ void IceTransportController::UpdateControlState() {
void IceTransportController::UpdateCongestedState() { void IceTransportController::UpdateCongestedState() {
if (auto update = GetCongestedStateUpdate()) { if (auto update = GetCongestedStateUpdate()) {
is_congested_ = update.value(); is_congested_ = update.value();
if (packet_sender_) {
packet_sender_->SetCongested(update.value()); packet_sender_->SetCongested(update.value());
} }
} }
}
std::optional<bool> IceTransportController::GetCongestedStateUpdate() const { std::optional<bool> IceTransportController::GetCongestedStateUpdate() const {
bool congested = transport_feedback_adapter_.GetOutstandingData() >= bool congested = transport_feedback_adapter_.GetOutstandingData() >=
@@ -603,11 +629,17 @@ std::optional<bool> IceTransportController::GetCongestedStateUpdate() const {
} }
bool IceTransportController::Process() { bool IceTransportController::Process() {
if (!is_running_.load()) {
return false;
}
if (task_queue_ && controller_) {
task_queue_->PostTask([this]() mutable { task_queue_->PostTask([this]() mutable {
webrtc::ProcessInterval msg; webrtc::ProcessInterval msg;
msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds());
PostUpdates(controller_->OnProcessInterval(msg)); PostUpdates(controller_->OnProcessInterval(msg));
}); });
}
return true; return true;
} }

View File

@@ -112,6 +112,7 @@ class IceTransportController
std::shared_ptr<PacketSenderImp> packet_sender_ = nullptr; std::shared_ptr<PacketSenderImp> packet_sender_ = nullptr;
std::string remote_user_id_; std::string remote_user_id_;
void *user_data_ = nullptr; void *user_data_ = nullptr;
std::atomic<bool> is_running_;
private: private:
std::shared_ptr<SystemClock> clock_; std::shared_ptr<SystemClock> clock_;

View File

@@ -22,7 +22,7 @@ PacketSenderImp::PacketSenderImp(std::shared_ptr<IceAgent> ice_agent,
last_call_time_(webrtc::Timestamp::Millis(0)), last_call_time_(webrtc::Timestamp::Millis(0)),
task_queue_(task_queue) {} task_queue_(task_queue) {}
PacketSenderImp::~PacketSenderImp() {} PacketSenderImp::~PacketSenderImp() { is_shutdown_ = true; }
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> std::vector<std::unique_ptr<webrtc::RtpPacketToSend>>
PacketSenderImp::GeneratePadding(webrtc::DataSize size) { PacketSenderImp::GeneratePadding(webrtc::DataSize size) {