[fix] solve deadlock caused by destroy ice agent

This commit is contained in:
dijunkun
2024-09-02 16:31:33 +08:00
parent 0b11646619
commit 98bd477af5
8 changed files with 54 additions and 24 deletions

View File

@@ -47,7 +47,7 @@ int IceAgent::CreateIceAgent(nice_cb_state_changed_t on_state_changed,
exit_nice_thread_ = false; exit_nice_thread_ = false;
nice_thread_.reset(new std::thread([this]() { nice_thread_ = std::thread([this]() {
gloop_ = g_main_loop_new(nullptr, false); gloop_ = g_main_loop_new(nullptr, false);
agent_ = nice_agent_new_full( agent_ = nice_agent_new_full(
@@ -97,10 +97,9 @@ int IceAgent::CreateIceAgent(nice_cb_state_changed_t on_state_changed,
user_ptr_); user_ptr_);
nice_inited_ = true; nice_inited_ = true;
g_main_loop_run(gloop_); g_main_loop_run(gloop_);
exit_nice_thread_ = true; exit_nice_thread_ = true;
})); });
do { do {
g_usleep(1000); g_usleep(1000);
@@ -130,8 +129,8 @@ int IceAgent::DestroyIceAgent() {
destroyed_ = true; destroyed_ = true;
g_main_loop_quit(gloop_); g_main_loop_quit(gloop_);
if (nice_thread_->joinable()) { if (nice_thread_.joinable()) {
nice_thread_->join(); nice_thread_.join();
} }
LOG_INFO("Destroy nice agent success"); LOG_INFO("Destroy nice agent success");

View File

@@ -66,7 +66,7 @@ class IceAgent {
std::string turn_username_ = ""; std::string turn_username_ = "";
std::string turn_password_ = ""; std::string turn_password_ = "";
std::unique_ptr<std::thread> nice_thread_; std::thread nice_thread_;
std::atomic<NiceAgent*> agent_{nullptr}; std::atomic<NiceAgent*> agent_{nullptr};
std::atomic<GMainLoop*> gloop_{nullptr}; std::atomic<GMainLoop*> gloop_{nullptr};
std::atomic<bool> nice_inited_{false}; std::atomic<bool> nice_inited_{false};

View File

@@ -182,12 +182,29 @@ int PeerConnection::Init(PeerConnectionParams params,
LOG_INFO("Ice finish"); LOG_INFO("Ice finish");
} else if ("closed" == ice_status) { } else if ("closed" == ice_status) {
ice_ready_ = false; ice_ready_ = false;
if (!try_rejoin_with_turn_) {
on_connection_status_(ConnectionStatus::Closed, user_data_); on_connection_status_(ConnectionStatus::Closed, user_data_);
LOG_INFO("Ice closed"); LOG_INFO("Ice closed");
}
} else if ("failed" == ice_status) { } else if ("failed" == ice_status) {
ice_ready_ = false; ice_ready_ = false;
on_connection_status_(ConnectionStatus::Failed, user_data_); try_rejoin_with_turn_ = true;
if (try_rejoin_with_turn_) {
enable_turn_ = true; enable_turn_ = true;
LOG_INFO("Ice failed, destroy ice agent");
IceWorkMsg msg;
msg.type = IceWorkMsg::Type::Destroy;
PushIceWorkMsg(msg);
LOG_INFO("Create ice agent with TURN");
msg.type = IceWorkMsg::Type::UserIdList;
msg.transmission_id = remote_transmission_id_;
msg.user_id_list = user_id_list_;
PushIceWorkMsg(msg);
} else {
LOG_INFO("Unknown ice state");
}
} else { } else {
ice_ready_ = false; ice_ready_ = false;
} }
@@ -376,7 +393,7 @@ int PeerConnection::Join(const std::string &transmission_id,
leave_ = false; leave_ = false;
remote_transmission_id_ = transmission_id; remote_transmission_id_ = transmission_id;
ret = RequestTransmissionMemberList(remote_transmission_id_, password); ret = RequestTransmissionMemberList(remote_transmission_id_, password_);
return ret; return ret;
} }
@@ -647,25 +664,37 @@ void PeerConnection::ProcessSignal(const std::string &signal) {
} }
void PeerConnection::StartIceWorker() { void PeerConnection::StartIceWorker() {
ice_worker_.reset(new std::thread([this]() { LOG_INFO("Start ice worker");
while (ice_worker_running_) { ice_worker_ = std::thread([this]() {
while (true) {
std::unique_lock<std::mutex> lck(ice_work_mutex_); std::unique_lock<std::mutex> lck(ice_work_mutex_);
while (ice_work_msg_queue_.empty()) { while (ice_work_msg_queue_.empty() && ice_worker_running_) {
ice_work_cv_.wait(lck, [this] { return !ice_work_msg_queue_.empty(); }); ice_work_cv_.wait(lck, [this] {
return !ice_work_msg_queue_.empty() || !ice_worker_running_;
});
} }
if (!ice_worker_running_) {
LOG_INFO("Exit ice worker");
break;
}
IceWorkMsg msg = ice_work_msg_queue_.front(); IceWorkMsg msg = ice_work_msg_queue_.front();
ice_work_msg_queue_.pop(); ice_work_msg_queue_.pop();
lck.unlock();
ProcessIceWorkMsg(msg); ProcessIceWorkMsg(msg);
} }
std::queue<IceWorkMsg> empty_queue; std::queue<IceWorkMsg> empty_queue;
std::swap(ice_work_msg_queue_, empty_queue); std::swap(ice_work_msg_queue_, empty_queue);
})); });
} }
void PeerConnection::StopIceWorker() { void PeerConnection::StopIceWorker() {
LOG_INFO("Stop ice worker");
ice_worker_running_ = false; ice_worker_running_ = false;
if (ice_worker_->joinable()) { ice_work_cv_.notify_one();
ice_worker_->join(); if (ice_worker_.joinable()) {
ice_worker_.join();
} }
} }

View File

@@ -129,6 +129,7 @@ class PeerConnection {
bool enable_turn_ = false; bool enable_turn_ = false;
bool trickle_ice_ = true; bool trickle_ice_ = true;
TraversalMode mode_ = TraversalMode::P2P; TraversalMode mode_ = TraversalMode::P2P;
bool try_rejoin_with_turn_ = false;
private: private:
std::shared_ptr<WsTransmission> ws_transport_ = nullptr; std::shared_ptr<WsTransmission> ws_transport_ = nullptr;
@@ -187,7 +188,7 @@ class PeerConnection {
bool audio_codec_inited_ = false; bool audio_codec_inited_ = false;
private: private:
std::unique_ptr<std::thread> ice_worker_ = nullptr; std::thread ice_worker_;
std::atomic<bool> ice_worker_running_{true}; std::atomic<bool> ice_worker_running_{true};
std::queue<IceWorkMsg> ice_work_msg_queue_; std::queue<IceWorkMsg> ice_work_msg_queue_;
std::condition_variable ice_work_cv_; std::condition_variable ice_work_cv_;

View File

@@ -90,8 +90,8 @@ int LeaveConnection(PeerPtr *peer_ptr, const char *transmission_id) {
return -1; return -1;
} }
peer_ptr->peer_connection->Leave(transmission_id);
LOG_INFO("LeaveConnection"); LOG_INFO("LeaveConnection");
peer_ptr->peer_connection->Leave(transmission_id);
return 0; return 0;
} }

View File

@@ -227,7 +227,8 @@ int IceTransmission::InitIceTransmission(
if (user_ptr) { if (user_ptr) {
IceTransmission *ice_transmission_obj = IceTransmission *ice_transmission_obj =
static_cast<IceTransmission *>(user_ptr); static_cast<IceTransmission *>(user_ptr);
LOG_INFO("[{}] gather_done", ice_transmission_obj->user_id_); LOG_INFO("[{}->{}] gather_done", ice_transmission_obj->user_id_,
ice_transmission_obj->remote_user_id_);
if (!ice_transmission_obj->trickle_ice_) { if (!ice_transmission_obj->trickle_ice_) {
if (ice_transmission_obj->offer_peer_) { if (ice_transmission_obj->offer_peer_) {
@@ -323,7 +324,7 @@ int IceTransmission::GatherCandidates() {
int IceTransmission::SetRemoteSdp(const std::string &remote_sdp) { int IceTransmission::SetRemoteSdp(const std::string &remote_sdp) {
ice_agent_->SetRemoteSdp(remote_sdp.c_str()); ice_agent_->SetRemoteSdp(remote_sdp.c_str());
LOG_INFO("[{}] set remote sdp", user_id_); // LOG_INFO("[{}] set remote sdp", user_id_);
remote_ice_username_ = GetIceUsername(remote_sdp); remote_ice_username_ = GetIceUsername(remote_sdp);
return 0; return 0;
} }

View File

@@ -56,7 +56,7 @@ bool OpusEncoderImpl::PopFrame(StreamInfo &info) {
// 48000 sample rate<74><65>48 samples/ms * 20ms * 2 channel = 1920 // 48000 sample rate<74><65>48 samples/ms * 20ms * 2 channel = 1920
void OpusEncoderImpl::EncodeRun() { void OpusEncoderImpl::EncodeRun() {
m_thread = std::make_unique<std::thread>([this]() { m_thread = std::thread([this]() {
const int frame_size = 48 * 20; // 1920 const int frame_size = 48 * 20; // 1920
int input_len = sizeof(opus_int16) * frame_size * channel_num; int input_len = sizeof(opus_int16) * frame_size * channel_num;

View File

@@ -20,7 +20,7 @@ class OpusEncoderImpl {
std::mutex mutex; std::mutex mutex;
bool isRuning = true; bool isRuning = true;
std::mutex access_mutex; std::mutex access_mutex;
std::unique_ptr<std::thread> m_thread; std::thread > m_thread;
OpusDecoderImpl *decoder = nullptr; OpusDecoderImpl *decoder = nullptr;