From 0899fe2f1d32e53fb2966d2637bb48adbed40d48 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Mon, 11 Sep 2023 14:45:22 +0800 Subject: [PATCH] Fix crash caused by multi threads during program termination --- src/pc/peer_connection.cpp | 42 ++++++++++--------- src/pc/peer_connection.h | 21 +++++----- src/rtp/rtp_video_receiver.cpp | 59 ++++++++++++++------------- src/rtp/rtp_video_receiver.h | 19 +++++---- src/rtp/rtp_video_sender.cpp | 37 +++++++++-------- src/rtp/rtp_video_sender.h | 14 +++++-- src/thread/thread_base.cpp | 24 +++++++++++ src/thread/thread_base.h | 26 ++++++++++++ src/transmission/ice_transmission.cpp | 49 +++++++++++----------- src/transmission/ice_transmission.h | 22 +++++----- xmake.lua | 8 +++- 11 files changed, 199 insertions(+), 122 deletions(-) create mode 100644 src/thread/thread_base.cpp create mode 100644 src/thread/thread_base.h diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index 4444814..3a69607 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -59,12 +59,14 @@ int PeerConnection::Init(PeerConnectionParams params, on_ice_status_change_ = [this](std::string ice_status) { if ("JUICE_STATE_COMPLETED" == ice_status) { ice_ready_ = true; + LOG_INFO("Ice connected"); } else { ice_ready_ = false; + LOG_INFO("Ice not useable"); } }; - ws_transport_ = new WsTransmission(on_receive_ws_msg_); + ws_transport_ = std::make_shared(on_receive_ws_msg_); uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_; if (ws_transport_) { ws_transport_->Connect(uri_); @@ -149,9 +151,10 @@ void PeerConnection::ProcessSignal(const std::string &signal) { LOG_INFO("]"); for (auto &remote_user_id : user_id_list_) { - ice_transmission_list_[remote_user_id] = new IceTransmission( - true, transmission_id, user_id_, remote_user_id, ws_transport_, - on_receive_ice_msg_, on_ice_status_change_); + ice_transmission_list_[remote_user_id] = + std::make_unique( + true, transmission_id, user_id_, remote_user_id, ws_transport_, + on_receive_ice_msg_, on_ice_status_change_); ice_transmission_list_[remote_user_id]->InitIceTransmission( cfg_stun_server_ip_, stun_server_port_); ice_transmission_list_[remote_user_id]->JoinTransmission(); @@ -166,8 +169,6 @@ void PeerConnection::ProcessSignal(const std::string &signal) { auto user_id_it = ice_transmission_list_.find(user_id); if (user_id_it != ice_transmission_list_.end()) { user_id_it->second->DestroyIceTransmission(); - delete user_id_it->second; - user_id_it->second = nullptr; ice_transmission_list_.erase(user_id_it); LOG_INFO("Terminate transmission to user [{}]", user_id); } @@ -184,9 +185,10 @@ void PeerConnection::ProcessSignal(const std::string &signal) { std::string remote_user_id = j["remote_user_id"].get(); LOG_INFO("[{}] receive offer from [{}]", user_id_, remote_user_id); - ice_transmission_list_[remote_user_id] = new IceTransmission( - false, transmission_id, user_id_, remote_user_id, ws_transport_, - on_receive_ice_msg_, on_ice_status_change_); + ice_transmission_list_[remote_user_id] = + std::make_unique( + false, transmission_id, user_id_, remote_user_id, ws_transport_, + on_receive_ice_msg_, on_ice_status_change_); ice_transmission_list_[remote_user_id]->InitIceTransmission( cfg_stun_server_ip_, stun_server_port_); @@ -237,17 +239,19 @@ int PeerConnection::RequestTransmissionMemberList( return 0; } -int PeerConnection::Destroy() { - if (ws_transport_) { - delete ws_transport_; - } - return 0; -} +int PeerConnection::Destroy() { return 0; } SignalStatus PeerConnection::GetSignalStatus() { return signal_status_; } int PeerConnection::SendVideoData(const char *data, size_t size) { - if (!ice_ready_) return -1; + if (!ice_ready_) { + return -1; + } + + if (ice_transmission_list_.empty()) { + return -1; + } + int ret = Encode((uint8_t *)data, size); if (0 != ret) { LOG_ERROR("Encode failed"); @@ -261,7 +265,7 @@ int PeerConnection::SendVideoData(const char *data, size_t size) { } int PeerConnection::OnEncodedImage(char *encoded_packets, size_t size) { - for (auto ice_trans : ice_transmission_list_) { + for (auto &ice_trans : ice_transmission_list_) { LOG_ERROR("H264 frame size: [{}]", size); ice_trans.second->SendData(encoded_packets, size); } @@ -270,14 +274,14 @@ int PeerConnection::OnEncodedImage(char *encoded_packets, size_t size) { } int PeerConnection::SendAudioData(const char *data, size_t size) { - for (auto ice_trans : ice_transmission_list_) { + for (auto &ice_trans : ice_transmission_list_) { ice_trans.second->SendData(data, size); } return 0; } int PeerConnection::SendUserData(const char *data, size_t size) { - for (auto ice_trans : ice_transmission_list_) { + for (auto &ice_trans : ice_transmission_list_) { ice_trans.second->SendData(data, size); } return 0; diff --git a/src/pc/peer_connection.h b/src/pc/peer_connection.h index 5808d75..41e0dc5 100644 --- a/src/pc/peer_connection.h +++ b/src/pc/peer_connection.h @@ -62,25 +62,26 @@ class PeerConnection : public VideoEncoder, VideoDecoder { std::string cfg_stun_server_port_; int signal_server_port_ = 0; int stun_server_port_ = 0; - WsTransmission *ws_transport_ = nullptr; - IceTransmission *ice_transmission_ = nullptr; - std::map ice_transmission_list_; - std::function on_receive_ws_msg_ = nullptr; - std::function - on_receive_ice_msg_ = nullptr; - std::function on_ice_status_change_ = nullptr; - bool ice_ready_ = false; + private: + std::shared_ptr ws_transport_ = nullptr; + std::function on_receive_ws_msg_ = nullptr; unsigned int ws_connection_id_ = 0; std::string user_id_ = ""; std::string transmission_id_ = ""; std::vector user_id_list_; SignalStatus signal_status_ = SignalStatus::Closed; + private: + std::map> + ice_transmission_list_; + std::function + on_receive_ice_msg_ = nullptr; + std::function on_ice_status_change_ = nullptr; + bool ice_ready_ = false; + OnReceiveBuffer on_receive_buffer_; char *nv12_data_ = nullptr; - - private: }; #endif \ No newline at end of file diff --git a/src/rtp/rtp_video_receiver.cpp b/src/rtp/rtp_video_receiver.cpp index 3a3ea04..d9927e3 100644 --- a/src/rtp/rtp_video_receiver.cpp +++ b/src/rtp/rtp_video_receiver.cpp @@ -6,19 +6,9 @@ RtpVideoReceiver::RtpVideoReceiver() {} -RtpVideoReceiver::~RtpVideoReceiver() { - if (jitter_thread_ && jitter_thread_->joinable()) { - jitter_thread_->join(); - delete jitter_thread_; - jitter_thread_ = nullptr; - } -} +RtpVideoReceiver::~RtpVideoReceiver() {} void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { - if (!jitter_thread_) { - jitter_thread_ = new std::thread(&RtpVideoReceiver::Process, this); - } - if (NAL_UNIT_TYPE::NALU == rtp_packet.NalUnitType()) { compelete_video_frame_queue_.push( VideoFrame(rtp_packet.Payload(), rtp_packet.Size())); @@ -98,23 +88,36 @@ bool RtpVideoReceiver::CheckIsFrameCompleted(RtpPacket& rtp_packet) { return false; } -void RtpVideoReceiver::Process() { - while (1) { - if (!compelete_video_frame_queue_.isEmpty()) { - VideoFrame video_frame; - compelete_video_frame_queue_.pop(video_frame); - if (on_receive_complete_frame_) { - auto now_complete_frame_ts = std::chrono::high_resolution_clock::now() - .time_since_epoch() - .count() / - 1000000; - uint32_t duration = now_complete_frame_ts - last_complete_frame_ts_; - LOG_ERROR("Duration {}", 1000 / duration); - last_complete_frame_ts_ = now_complete_frame_ts; - on_receive_complete_frame_(video_frame); - } - } +void RtpVideoReceiver::Start() { + std::lock_guard lock_guard(mutex_); + stop_ = false; +} - std::this_thread::sleep_for(std::chrono::milliseconds(13)); +void RtpVideoReceiver::Stop() { + std::lock_guard lock_guard(mutex_); + stop_ = true; +} + +bool RtpVideoReceiver::Process() { + std::lock_guard lock_guard(mutex_); + if (stop_) { + return false; } + + if (!compelete_video_frame_queue_.isEmpty()) { + VideoFrame video_frame; + compelete_video_frame_queue_.pop(video_frame); + if (on_receive_complete_frame_) { + auto now_complete_frame_ts = + std::chrono::high_resolution_clock::now().time_since_epoch().count() / + 1000000; + uint32_t duration = now_complete_frame_ts - last_complete_frame_ts_; + LOG_ERROR("Duration {}", 1000 / duration); + last_complete_frame_ts_ = now_complete_frame_ts; + on_receive_complete_frame_(video_frame); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(13)); + return true; } \ No newline at end of file diff --git a/src/rtp/rtp_video_receiver.h b/src/rtp/rtp_video_receiver.h index 2ba99fb..6a70195 100644 --- a/src/rtp/rtp_video_receiver.h +++ b/src/rtp/rtp_video_receiver.h @@ -3,14 +3,15 @@ #include #include +#include #include -#include #include "frame.h" #include "ringbuffer.h" #include "rtp_video_session.h" +#include "thread_base.h" -class RtpVideoReceiver { +class RtpVideoReceiver : public ThreadBase { public: RtpVideoReceiver(); ~RtpVideoReceiver(); @@ -23,13 +24,16 @@ class RtpVideoReceiver { on_receive_complete_frame_ = on_receive_complete_frame; } + void Start(); + void Stop(); + private: bool CheckIsFrameCompleted(RtpPacket& rtp_packet); - void Process(); - - // private: // void OnReceiveFrame(uint8_t* payload) {} + private: + bool Process() override; + private: std::map incomplete_frame_list_; uint8_t* nv12_data_ = nullptr; @@ -37,8 +41,9 @@ class RtpVideoReceiver { uint32_t last_complete_frame_ts_ = 0; RingBuffer compelete_video_frame_queue_; - std::thread* jitter_thread_ = nullptr; - bool start_ = false; + + bool stop_ = true; + std::mutex mutex_; }; #endif diff --git a/src/rtp/rtp_video_sender.cpp b/src/rtp/rtp_video_sender.cpp index f229cdb..e01a2de 100644 --- a/src/rtp/rtp_video_sender.cpp +++ b/src/rtp/rtp_video_sender.cpp @@ -2,29 +2,35 @@ #include +#include "log.h" + RtpVideoSender::RtpVideoSender() {} -RtpVideoSender::~RtpVideoSender() { - if (send_thread_ && send_thread_->joinable()) { - send_thread_->join(); - delete send_thread_; - send_thread_ = nullptr; - } -} +RtpVideoSender::~RtpVideoSender() {} void RtpVideoSender::Enqueue(std::vector& rtp_packets) { - if (!send_thread_) { - send_thread_ = new std::thread(&RtpVideoSender::Process, this); - } - for (auto& rtp_packet : rtp_packets) { - start_ = true; rtp_packe_queue_.push(rtp_packet); } } -void RtpVideoSender::Process() { - while (1) { +void RtpVideoSender::Start() { + std::lock_guard lock_guard(mutex_); + stop_ = false; +} + +void RtpVideoSender::Stop() { + std::lock_guard lock_guard(mutex_); + stop_ = true; +} + +bool RtpVideoSender::Process() { + std::lock_guard lock_guard(mutex_); + if (stop_) { + return false; + } + + for (size_t i = 0; i < 50; i++) if (!rtp_packe_queue_.isEmpty()) { RtpPacket rtp_packet; rtp_packe_queue_.pop(rtp_packet); @@ -33,6 +39,5 @@ void RtpVideoSender::Process() { } } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } + return true; } \ No newline at end of file diff --git a/src/rtp/rtp_video_sender.h b/src/rtp/rtp_video_sender.h index 73e4ded..0bfe45c 100644 --- a/src/rtp/rtp_video_sender.h +++ b/src/rtp/rtp_video_sender.h @@ -2,12 +2,14 @@ #define _RTP_VIDEO_SENDER_H_ #include +#include #include #include "ringbuffer.h" #include "rtp_packet.h" +#include "thread_base.h" -class RtpVideoSender { +class RtpVideoSender : public ThreadBase { public: RtpVideoSender(); ~RtpVideoSender(); @@ -21,14 +23,18 @@ class RtpVideoSender { rtp_packet_send_func_ = rtp_packet_send_func; } + void Start(); + void Stop(); + private: - void Process(); + bool Process() override; private: std::function rtp_packet_send_func_ = nullptr; RingBuffer rtp_packe_queue_; - std::thread *send_thread_ = nullptr; - bool start_ = false; + + bool stop_ = true; + std::mutex mutex_; }; #endif \ No newline at end of file diff --git a/src/thread/thread_base.cpp b/src/thread/thread_base.cpp new file mode 100644 index 0000000..fce73a0 --- /dev/null +++ b/src/thread/thread_base.cpp @@ -0,0 +1,24 @@ +#include "thread_base.h" + +#include "log.h" + +ThreadBase::ThreadBase() {} + +ThreadBase::~ThreadBase() {} + +void ThreadBase::StartThread() { + if (!thread_) { + thread_ = std::make_unique(&ThreadBase::Run, this); + } +} + +void ThreadBase::StopThread() { + if (thread_ && thread_->joinable()) { + thread_->join(); + } +} + +void ThreadBase::Run() { + while (Process()) { + } +} \ No newline at end of file diff --git a/src/thread/thread_base.h b/src/thread/thread_base.h new file mode 100644 index 0000000..fe41ee6 --- /dev/null +++ b/src/thread/thread_base.h @@ -0,0 +1,26 @@ +#ifndef _THREAD_BASE_H_ +#define _THREAD_BASE_H_ + +#include +#include + +class ThreadBase { + public: + ThreadBase(); + ~ThreadBase(); + + public: + void StartThread(); + void StopThread(); + virtual bool Process() = 0; + + private: + void Run(); + + private: + std::unique_ptr thread_ = nullptr; + bool start_ = false; + std::mutex mutex_; +}; + +#endif \ No newline at end of file diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index 0f561fb..9347d09 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -18,7 +18,8 @@ const std::vector ice_status = { IceTransmission::IceTransmission( bool offer_peer, std::string &transmission_id, std::string &user_id, - std::string &remote_user_id, WsTransmission *ice_ws_transmission, + std::string &remote_user_id, + std::shared_ptr ice_ws_transmission, std::function on_receive_ice_msg, std::function on_ice_status_change) @@ -31,36 +32,25 @@ IceTransmission::IceTransmission( on_ice_status_change_(on_ice_status_change) {} IceTransmission::~IceTransmission() { - if (kcp_update_thread_ && kcp_update_thread_->joinable()) { - kcp_update_thread_->join(); - delete kcp_update_thread_; - kcp_update_thread_ = nullptr; - } - - if (rtp_video_session_) { - delete rtp_video_session_; - rtp_video_session_ = nullptr; + if (rtp_video_sender_) { + rtp_video_sender_->Stop(); + rtp_video_sender_->StopThread(); } if (rtp_video_receiver_) { - delete rtp_video_receiver_; - rtp_video_receiver_ = nullptr; + rtp_video_receiver_->Stop(); + rtp_video_receiver_->StopThread(); } if (rtp_payload_) { delete rtp_payload_; rtp_payload_ = nullptr; } - - if (ice_agent_) { - delete ice_agent_; - ice_agent_ = nullptr; - } } int IceTransmission::InitIceTransmission(std::string &ip, int port) { - rtp_video_session_ = new RtpVideoSession(PAYLOAD_TYPE::H264); - rtp_video_receiver_ = new RtpVideoReceiver(); + rtp_video_session_ = std::make_unique(PAYLOAD_TYPE::H264); + rtp_video_receiver_ = std::make_unique(); rtp_video_receiver_->SetOnReceiveCompleteFrame( [this](VideoFrame &video_frame) -> void { LOG_ERROR("OnReceiveCompleteFrame {}", video_frame.Size()); @@ -69,16 +59,21 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { remote_user_id_.size()); }); - rtp_video_sender_ = new RtpVideoSender(); + rtp_video_receiver_->StartThread(); + rtp_video_receiver_->Start(); + + rtp_video_sender_ = std::make_unique(); rtp_video_sender_->SetRtpPacketSendFunc([this]( RtpPacket &rtp_packet) -> void { if (ice_agent_) { - LOG_ERROR("Send rtp packet {}", rtp_packet.Size()); ice_agent_->Send((const char *)rtp_packet.Buffer(), rtp_packet.Size()); } }); - ice_agent_ = new IceAgent(ip, port); + rtp_video_sender_->StartThread(); + rtp_video_sender_->Start(); + + ice_agent_ = std::make_unique(ip, port); ice_agent_->CreateIceAgent( [](juice_agent_t *agent, juice_state_t state, void *user_ptr) { @@ -136,7 +131,6 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { int IceTransmission::DestroyIceTransmission() { LOG_INFO("[{}->{}] Destroy ice transmission", user_id_, remote_user_id_); - kcp_stop_ = true; return ice_agent_->DestoryIceAgent(); } @@ -221,8 +215,13 @@ int IceTransmission::SendData(const char *data, size_t size) { if (JUICE_STATE_COMPLETED == state_) { std::vector packets; - rtp_video_session_->Encode((uint8_t *)data, size, packets); - rtp_video_sender_->Enqueue(packets); + if (rtp_video_session_) { + rtp_video_session_->Encode((uint8_t *)data, size, packets); + } + if (rtp_video_sender_) { + rtp_video_sender_->Enqueue(packets); + } + // for (auto &packet : packets) { // ice_agent_->Send((const char *)packet.Buffer(), packet.Size()); // } diff --git a/src/transmission/ice_transmission.h b/src/transmission/ice_transmission.h index e04724f..ee1a4e5 100644 --- a/src/transmission/ice_transmission.h +++ b/src/transmission/ice_transmission.h @@ -16,7 +16,8 @@ class IceTransmission { public: IceTransmission( bool offer_peer, std::string &transmission_id, std::string &user_id, - std::string &remote_user_id, WsTransmission *ice_ws_transmission, + std::string &remote_user_id, + std::shared_ptr ice_ws_transmission, std::function on_receive_ice_msg, std::function on_ice_status_change); @@ -52,8 +53,8 @@ class IceTransmission { int SendAnswer(); private: - IceAgent *ice_agent_ = nullptr; - WsTransmission *ice_ws_transport_ = nullptr; + std::unique_ptr ice_agent_ = nullptr; + std::shared_ptr ice_ws_transport_ = nullptr; CongestionControl *congestion_control_ = nullptr; std::function on_receive_ice_msg_cb_ = nullptr; @@ -71,17 +72,14 @@ class IceTransmission { juice_state_t state_ = JUICE_STATE_DISCONNECTED; private: - // ikcpcb *kcp_ = nullptr; - char kcp_complete_buffer_[2560 * 1440 * 4]; - bool kcp_stop_ = false; - std::thread *kcp_update_thread_ = nullptr; - - private: - RtpVideoSession *rtp_video_session_ = nullptr; - RtpVideoReceiver *rtp_video_receiver_ = nullptr; - RtpVideoSender *rtp_video_sender_ = nullptr; + std::unique_ptr rtp_video_session_ = nullptr; + std::unique_ptr rtp_video_receiver_ = nullptr; + std::unique_ptr rtp_video_sender_ = nullptr; uint8_t *rtp_payload_ = nullptr; RtpPacket pop_packet_; + bool start_send_packet_ = false; + + uint32_t last_complete_frame_ts_ = 0; }; #endif \ No newline at end of file diff --git a/xmake.lua b/xmake.lua index 60b905d..fa1a726 100644 --- a/xmake.lua +++ b/xmake.lua @@ -48,6 +48,12 @@ target("ringbuffer") set_kind("headeronly") add_includedirs("src/ringbuffer", {public = true}) +target("thread") + set_kind("static") + add_deps("log") + add_files("src/thread/*.cpp") + add_includedirs("src/thread", {public = true}) + target("frame") set_kind("static") add_files("src/frame/*.cpp") @@ -55,7 +61,7 @@ target("frame") target("rtp") set_kind("static") - add_deps("log", "frame", "ringbuffer") + add_deps("log", "frame", "ringbuffer", "thread") add_files("src/rtp/*.cpp") add_includedirs("src/rtp", {public = true})