[fix] fix crash due to rtp receivers destroy

This commit is contained in:
dijunkun
2025-02-06 17:08:26 +08:00
parent 1d85247785
commit 3accdf2192
8 changed files with 82 additions and 36 deletions

View File

@@ -6,10 +6,12 @@
#define RTCP_SR_INTERVAL 1000
RtpAudioSender::RtpAudioSender() {}
RtpAudioSender::RtpAudioSender() { SetPeriod(std::chrono::milliseconds(5)); }
RtpAudioSender::RtpAudioSender(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {}
: io_statistics_(io_statistics) {
SetPeriod(std::chrono::milliseconds(5));
}
RtpAudioSender::~RtpAudioSender() {
if (rtp_statistics_) {
@@ -140,6 +142,5 @@ bool RtpAudioSender::Process() {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
}

View File

@@ -9,7 +9,9 @@
RtpDataSender::RtpDataSender() {}
RtpDataSender::RtpDataSender(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {}
: io_statistics_(io_statistics) {
SetPeriod(std::chrono::milliseconds(5));
}
RtpDataSender::~RtpDataSender() {
if (rtp_statistics_) {
@@ -140,6 +142,5 @@ bool RtpDataSender::Process() {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
}

View File

@@ -20,7 +20,10 @@ RtpVideoReceiver::RtpVideoReceiver()
[this](int64_t bitrate_bps, std::vector<uint32_t> ssrcs) {
SendRemb(bitrate_bps, ssrcs);
}),
clock_(Clock::GetRealTimeClockShared()) {}
clock_(Clock::GetRealTimeClockShared()) {
SetPeriod(std::chrono::milliseconds(5));
// rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this);
}
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics),
@@ -34,7 +37,8 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<IOStatistics> io_statistics)
SendRemb(bitrate_bps, ssrcs);
}),
clock_(Clock::GetRealTimeClockShared()) {
rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this);
SetPeriod(std::chrono::milliseconds(5));
// rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this);
#ifdef SAVE_RTP_RECV_STREAM
file_rtp_recv_ = fopen("rtp_recv_stream.h264", "w+b");
@@ -45,19 +49,18 @@ RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<IOStatistics> io_statistics)
}
RtpVideoReceiver::~RtpVideoReceiver() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
rtcp_stop_.store(true);
rtcp_cv_.notify_one();
rtcp_cv_.notify_all();
if (rtcp_thread_.joinable()) {
rtcp_thread_.join();
}
SSRCManager::Instance().DeleteSsrc(feedback_ssrc_);
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
#ifdef SAVE_RTP_RECV_STREAM
if (file_rtp_recv_) {
fflush(file_rtp_recv_);
@@ -469,12 +472,11 @@ bool RtpVideoReceiver::Process() {
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
}
void RtpVideoReceiver::RtcpThread() {
while (!rtcp_stop_) {
while (!rtcp_stop_.load()) {
std::unique_lock<std::mutex> lock(rtcp_mtx_);
if (rtcp_cv_.wait_for(
lock, std::chrono::milliseconds(rtcp_tcc_interval_ms_),

View File

@@ -12,6 +12,7 @@ RtpVideoSender::RtpVideoSender() {}
RtpVideoSender::RtpVideoSender(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {
SetPeriod(std::chrono::milliseconds(5));
#ifdef SAVE_RTP_SENT_STREAM
file_rtp_sent_ = fopen("rtp_sent_stream.h264", "w+b");
if (!file_rtp_sent_) {
@@ -162,6 +163,5 @@ bool RtpVideoSender::Process() {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
}

View File

@@ -44,7 +44,11 @@ void VideoChannelReceive::Initialize(rtp::PAYLOAD_TYPE payload_type) {
rtp_video_receiver_->Start();
}
void VideoChannelReceive::Destroy() {}
void VideoChannelReceive::Destroy() {
if (rtp_video_receiver_) {
rtp_video_receiver_->Stop();
}
}
int VideoChannelReceive::OnReceiveRtpPacket(const char *data, size_t size) {
if (ice_io_statistics_) {

View File

@@ -2,23 +2,34 @@
#include "log.h"
ThreadBase::ThreadBase() {}
ThreadBase::ThreadBase()
: running_(false), pause_(false), period_(std::chrono::milliseconds(100)) {}
ThreadBase::~ThreadBase() {
if (!stop_) {
Stop();
}
}
ThreadBase::~ThreadBase() { Stop(); }
void ThreadBase::Start() {
{
std::lock_guard<std::mutex> lock(cv_mtx_);
if (running_) {
return; // Already running
}
running_ = true;
}
std::thread t(&ThreadBase::Run, this);
thread_ = std::move(t);
stop_ = false;
}
void ThreadBase::Stop() {
{
std::lock_guard<std::mutex> lock(cv_mtx_);
if (!running_) {
return; // Already stopped
}
running_ = false;
}
cv_.notify_all();
if (thread_.joinable()) {
stop_ = true;
thread_.join();
}
}
@@ -27,7 +38,17 @@ void ThreadBase::Pause() { pause_ = true; }
void ThreadBase::Resume() { pause_ = false; }
void ThreadBase::SetPeriod(std::chrono::milliseconds period) {
std::lock_guard<std::mutex> lock(cv_mtx_);
period_ = period;
}
void ThreadBase::Run() {
while (!stop_ && Process()) {
while (running_) {
std::unique_lock<std::mutex> lock(cv_mtx_);
cv_.wait_for(lock, period_, [this] { return !running_; });
if (running_) {
Process();
}
}
}

View File

@@ -2,6 +2,9 @@
#define _THREAD_BASE_H_
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
class ThreadBase {
@@ -16,6 +19,8 @@ class ThreadBase {
void Pause();
void Resume();
void SetPeriod(std::chrono::milliseconds period);
virtual bool Process() = 0;
private:
@@ -23,9 +28,13 @@ class ThreadBase {
private:
std::thread thread_;
std::chrono::milliseconds period_;
std::atomic<bool> stop_{false};
std::atomic<bool> pause_{false};
std::condition_variable cv_;
std::mutex cv_mtx_;
std::atomic<bool> running_;
std::atomic<bool> pause_;
};
#endif

View File

@@ -423,20 +423,28 @@ int IceTransport::DestroyIceTransmission() {
ice_io_statistics_->Stop();
}
if (rtp_video_receiver_) {
rtp_video_receiver_->Stop();
if (video_channel_send_) {
video_channel_send_->Destroy();
}
if (rtp_video_sender_) {
rtp_video_sender_->Stop();
if (audio_channel_send_) {
audio_channel_send_->Destroy();
}
if (rtp_audio_sender_) {
rtp_audio_sender_->Stop();
if (data_channel_send_) {
data_channel_send_->Destroy();
}
if (rtp_data_sender_) {
rtp_data_sender_->Stop();
if (video_channel_receive_) {
video_channel_receive_->Destroy();
}
if (audio_channel_receive_) {
audio_channel_receive_->Destroy();
}
if (data_channel_receive_) {
data_channel_receive_->Destroy();
}
return ice_agent_->DestroyIceAgent();