diff --git a/src/ice/ice_agent.cpp b/src/ice/ice_agent.cpp index 824235e..fb57b65 100644 --- a/src/ice/ice_agent.cpp +++ b/src/ice/ice_agent.cpp @@ -47,7 +47,7 @@ int IceAgent::CreateIceAgent(nice_cb_state_changed_t on_state_changed, exit_nice_thread_ = false; - nice_thread_.reset(new std::thread([this]() { + nice_thread_ = std::thread([this]() { gloop_ = g_main_loop_new(nullptr, false); agent_ = nice_agent_new_full( @@ -97,10 +97,9 @@ int IceAgent::CreateIceAgent(nice_cb_state_changed_t on_state_changed, user_ptr_); nice_inited_ = true; - g_main_loop_run(gloop_); exit_nice_thread_ = true; - })); + }); do { g_usleep(1000); @@ -130,8 +129,8 @@ int IceAgent::DestroyIceAgent() { destroyed_ = true; g_main_loop_quit(gloop_); - if (nice_thread_->joinable()) { - nice_thread_->join(); + if (nice_thread_.joinable()) { + nice_thread_.join(); } LOG_INFO("Destroy nice agent success"); diff --git a/src/ice/ice_agent.h b/src/ice/ice_agent.h index 1f18a68..60fa921 100644 --- a/src/ice/ice_agent.h +++ b/src/ice/ice_agent.h @@ -66,7 +66,7 @@ class IceAgent { std::string turn_username_ = ""; std::string turn_password_ = ""; - std::unique_ptr nice_thread_; + std::thread nice_thread_; std::atomic agent_{nullptr}; std::atomic gloop_{nullptr}; std::atomic nice_inited_{false}; diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index 7443120..f3057a6 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -182,12 +182,29 @@ int PeerConnection::Init(PeerConnectionParams params, LOG_INFO("Ice finish"); } else if ("closed" == ice_status) { ice_ready_ = false; - on_connection_status_(ConnectionStatus::Closed, user_data_); - LOG_INFO("Ice closed"); + if (!try_rejoin_with_turn_) { + on_connection_status_(ConnectionStatus::Closed, user_data_); + LOG_INFO("Ice closed"); + } } else if ("failed" == ice_status) { ice_ready_ = false; - on_connection_status_(ConnectionStatus::Failed, user_data_); - enable_turn_ = true; + try_rejoin_with_turn_ = true; + if (try_rejoin_with_turn_) { + enable_turn_ = true; + LOG_INFO("Ice failed, destroy ice agent"); + + IceWorkMsg msg; + msg.type = IceWorkMsg::Type::Destroy; + PushIceWorkMsg(msg); + + LOG_INFO("Create ice agent with TURN"); + msg.type = IceWorkMsg::Type::UserIdList; + msg.transmission_id = remote_transmission_id_; + msg.user_id_list = user_id_list_; + PushIceWorkMsg(msg); + } else { + LOG_INFO("Unknown ice state"); + } } else { ice_ready_ = false; } @@ -376,7 +393,7 @@ int PeerConnection::Join(const std::string &transmission_id, leave_ = false; remote_transmission_id_ = transmission_id; - ret = RequestTransmissionMemberList(remote_transmission_id_, password); + ret = RequestTransmissionMemberList(remote_transmission_id_, password_); return ret; } @@ -647,25 +664,37 @@ void PeerConnection::ProcessSignal(const std::string &signal) { } void PeerConnection::StartIceWorker() { - ice_worker_.reset(new std::thread([this]() { - while (ice_worker_running_) { + LOG_INFO("Start ice worker"); + ice_worker_ = std::thread([this]() { + while (true) { std::unique_lock lck(ice_work_mutex_); - while (ice_work_msg_queue_.empty()) { - ice_work_cv_.wait(lck, [this] { return !ice_work_msg_queue_.empty(); }); + while (ice_work_msg_queue_.empty() && ice_worker_running_) { + ice_work_cv_.wait(lck, [this] { + return !ice_work_msg_queue_.empty() || !ice_worker_running_; + }); } + + if (!ice_worker_running_) { + LOG_INFO("Exit ice worker"); + break; + } + IceWorkMsg msg = ice_work_msg_queue_.front(); ice_work_msg_queue_.pop(); + lck.unlock(); ProcessIceWorkMsg(msg); } std::queue empty_queue; std::swap(ice_work_msg_queue_, empty_queue); - })); + }); } void PeerConnection::StopIceWorker() { + LOG_INFO("Stop ice worker"); ice_worker_running_ = false; - if (ice_worker_->joinable()) { - ice_worker_->join(); + ice_work_cv_.notify_one(); + if (ice_worker_.joinable()) { + ice_worker_.join(); } } diff --git a/src/pc/peer_connection.h b/src/pc/peer_connection.h index a274310..f20d673 100644 --- a/src/pc/peer_connection.h +++ b/src/pc/peer_connection.h @@ -129,6 +129,7 @@ class PeerConnection { bool enable_turn_ = false; bool trickle_ice_ = true; TraversalMode mode_ = TraversalMode::P2P; + bool try_rejoin_with_turn_ = false; private: std::shared_ptr ws_transport_ = nullptr; @@ -187,7 +188,7 @@ class PeerConnection { bool audio_codec_inited_ = false; private: - std::unique_ptr ice_worker_ = nullptr; + std::thread ice_worker_; std::atomic ice_worker_running_{true}; std::queue ice_work_msg_queue_; std::condition_variable ice_work_cv_; diff --git a/src/rtc/x_inner.cpp b/src/rtc/x_inner.cpp index b9fdedd..dcdb8e3 100644 --- a/src/rtc/x_inner.cpp +++ b/src/rtc/x_inner.cpp @@ -90,8 +90,8 @@ int LeaveConnection(PeerPtr *peer_ptr, const char *transmission_id) { return -1; } - peer_ptr->peer_connection->Leave(transmission_id); LOG_INFO("LeaveConnection"); + peer_ptr->peer_connection->Leave(transmission_id); return 0; } diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index 247e1a4..d4c4321 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -227,7 +227,8 @@ int IceTransmission::InitIceTransmission( if (user_ptr) { IceTransmission *ice_transmission_obj = static_cast(user_ptr); - LOG_INFO("[{}] gather_done", ice_transmission_obj->user_id_); + LOG_INFO("[{}->{}] gather_done", ice_transmission_obj->user_id_, + ice_transmission_obj->remote_user_id_); if (!ice_transmission_obj->trickle_ice_) { if (ice_transmission_obj->offer_peer_) { @@ -323,7 +324,7 @@ int IceTransmission::GatherCandidates() { int IceTransmission::SetRemoteSdp(const std::string &remote_sdp) { ice_agent_->SetRemoteSdp(remote_sdp.c_str()); - LOG_INFO("[{}] set remote sdp", user_id_); + // LOG_INFO("[{}] set remote sdp", user_id_); remote_ice_username_ = GetIceUsername(remote_sdp); return 0; } diff --git a/tests/opus/OpusEncoderImpl.cpp b/tests/opus/OpusEncoderImpl.cpp index b0240b5..de19ac1 100644 --- a/tests/opus/OpusEncoderImpl.cpp +++ b/tests/opus/OpusEncoderImpl.cpp @@ -56,7 +56,7 @@ bool OpusEncoderImpl::PopFrame(StreamInfo &info) { // 48000 sample rate٬48 samples/ms * 20ms * 2 channel = 1920 void OpusEncoderImpl::EncodeRun() { - m_thread = std::make_unique([this]() { + m_thread = std::thread([this]() { const int frame_size = 48 * 20; // 1920 int input_len = sizeof(opus_int16) * frame_size * channel_num; diff --git a/tests/opus/OpusEncoderImpl.h b/tests/opus/OpusEncoderImpl.h index baafe9d..0573475 100644 --- a/tests/opus/OpusEncoderImpl.h +++ b/tests/opus/OpusEncoderImpl.h @@ -20,7 +20,7 @@ class OpusEncoderImpl { std::mutex mutex; bool isRuning = true; std::mutex access_mutex; - std::unique_ptr m_thread; + std::thread > m_thread; OpusDecoderImpl *decoder = nullptr;